Author: clebert.suconic(a)jboss.com
Date: 2012-02-06 22:20:35 -0500 (Mon, 06 Feb 2012)
New Revision: 12091
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
HORNETQ-843 - fix on paging
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2012-02-07
03:16:10 UTC (rev 12090)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2012-02-07
03:20:35 UTC (rev 12091)
@@ -27,6 +27,7 @@
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.TestableServer;
@@ -191,7 +192,74 @@
Assert.assertEquals(i, result);
}
}
+
+ public void testExpireMessage() throws Exception
+ {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
+ ClientSession session = sf.createSession(true, true, 0);
+
+ try
+ {
+
+ session.createQueue(PagingFailoverTest.ADDRESS, PagingFailoverTest.ADDRESS,
true);
+
+ ClientProducer prod = session.createProducer(PagingFailoverTest.ADDRESS);
+
+ final int TOTAL_MESSAGES = 1000;
+
+ for (int i = 0; i < TOTAL_MESSAGES; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty(new SimpleString("key"), i);
+ msg.setExpiration(System.currentTimeMillis() + 1000);
+ prod.send(msg);
+ }
+
+ crash(session);
+
+ session.close();
+
+ Queue queue = backupServer.getServer().locateQueue(ADDRESS);
+
+ long timeout = System.currentTimeMillis() + 60000;
+ System.out.println("Starting now");
+ while (timeout > System.currentTimeMillis() &&
queue.getPageSubscription().isPaging())
+ {
+ Thread.sleep(100);
+ // Simulating what would happen on expire
+ queue.expireReferences();
+ }
+
+ try
+ {
+ assertFalse(queue.getPageSubscription().isPaging());
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ locator.close();
+ }
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Show replies by date