Author: clebert.suconic(a)jboss.com
Date: 2010-11-23 23:25:38 -0500 (Tue, 23 Nov 2010)
New Revision: 9925
Modified:
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
fixing pagingFailoverTest
Modified:
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-11-24
03:36:30 UTC (rev 9924)
+++
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-11-24
04:25:38 UTC (rev 9925)
@@ -1107,6 +1107,9 @@
}
}
}
+
+ // To recover positions on Iterators
+ pagingManager.processReload();
if (perfBlastPages != -1)
{
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-24
03:36:30 UTC (rev 9924)
+++
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-24
04:25:38 UTC (rev 9925)
@@ -1700,6 +1700,135 @@
}
+ public void testParialConsume() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int numberOfMessages = 1000;
+
+ try
+ {
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession(null, null, false, false, false, false,
0);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(new byte[1024]);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ }
+
+ session.commit();
+
+ session.close();
+
+ locator.close();
+
+ server.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ locator = createInVMNonHALocator();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(null, null, false, false, false, false, 0);
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+ session.start();
+ // 347 = I just picked any odd number, not rounded, to make sure it's not at
the beggining of any page
+ for (int i = 0; i < 347; i++)
+ {
+ System.out.println("Received " + i);
+ ClientMessage msg = consumer.receive(5000);
+ assertEquals(i, msg.getIntProperty("id").intValue());
+ Assert.assertNotNull(msg);
+ msg.acknowledge();
+ session.commit();
+ }
+
+ session.close();
+
+ locator.close();
+
+ server.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ locator = createInVMNonHALocator();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(null, null, false, false, false, false, 0);
+
+ consumer = session.createConsumer(PagingTest.ADDRESS);
+
+ session.start();
+ for (int i = 347; i < numberOfMessages; i++)
+ {
+ System.out.println("Received " + i);
+ ClientMessage msg = consumer.receive(5000);
+ assertEquals(i, msg.getIntProperty("id").intValue());
+ Assert.assertNotNull(msg);
+ msg.acknowledge();
+ session.commit();
+ }
+
+ session.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testPageMultipleDestinations() throws Exception
{
internalTestPageMultipleDestinations(false);