Author: jbertram
Date: 2012-02-17 15:14:46 -0500 (Fri, 17 Feb 2012)
New Revision: 12143
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
[HORNETQ-859] Page files not deleted on rollback
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2012-02-17
20:12:28 UTC (rev 12142)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2012-02-17
20:14:46 UTC (rev 12143)
@@ -1012,7 +1012,7 @@
public void afterRollback(final Transaction tx)
{
- if (tx.getState() == State.PREPARED && pageTransaction != null)
+ if (pageTransaction != null)
{
pageTransaction.rollback();
}
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2012-02-17
20:12:28 UTC (rev 12142)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2012-02-17
20:14:46 UTC (rev 12143)
@@ -127,6 +127,140 @@
super.tearDown();
}
+ public void testPageCleanup() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server =
+ createServer(true, config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int numberOfMessages = 5000;
+
+ locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ final int MESSAGE_SIZE = 1024;
+
+ byte[] body = new byte[MESSAGE_SIZE];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= MESSAGE_SIZE; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ producer.close();
+ session.close();
+ //System.out.println("Just sent " + numberOfMessages + "
messages.");
+
+ session = sf.createSession(false, false, false);
+ producer = session.createProducer(PagingTest.ADDRESS);
+ producer.send(session.createMessage(true));
+ session.rollback();
+ producer.close();
+ session.close();
+ //System.out.println("Just sent (and rolled-back) 1 message.");
+
+ session = sf.createSession(false, false, false);
+ producer = session.createProducer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ producer.close();
+ session.close();
+ //System.out.println("Just sent " + numberOfMessages + "
messages.");
+
+ Queue queue = server.locateQueue(PagingTest.ADDRESS);
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+ ClientMessage msg = null;
+
+ assertEquals(numberOfMessages * 2, queue.getMessageCount());
+ for (int i = 0; i < numberOfMessages * 2; i++)
+ {
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ //System.out.println("ack " + i);
+
+ if (i % 500 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ consumer.close();
+ session.close();
+
+ sf.close();
+
+ locator.close();
+
+ assertEquals(0, queue.getMessageCount());
+
+ long timeout = System.currentTimeMillis() + 10000;
+ while (timeout > System.currentTimeMillis() &&
queue.getPageSubscription().getPagingStore().isPaging())
+ {
+ Thread.sleep(100);
+ }
+ assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
+ }
+
public void testPreparePersistent() throws Exception
{
clearData();
Show replies by date