[hornetq-commits] JBoss hornetq SVN: r12142 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Feb 17 15:12:28 EST 2012


Author: jbertram
Date: 2012-02-17 15:12:28 -0500 (Fri, 17 Feb 2012)
New Revision: 12142

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
[HORNETQ-859] Page files not deleted on rollback

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2012-02-17 18:45:25 UTC (rev 12141)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2012-02-17 20:12:28 UTC (rev 12142)
@@ -1012,7 +1012,7 @@
 
       public void afterRollback(final Transaction tx)
       {
-         if (tx.getState() == State.PREPARED && pageTransaction != null)
+         if (pageTransaction != null)
          {
             pageTransaction.rollback();
          }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2012-02-17 18:45:25 UTC (rev 12141)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2012-02-17 20:12:28 UTC (rev 12142)
@@ -127,6 +127,140 @@
       super.tearDown();
    }
 
+    public void testPageCleanup() throws Exception
+    {
+        clearData();
+
+        Configuration config = createDefaultConfig();
+
+        config.setJournalSyncNonTransactional(false);
+
+        HornetQServer server =
+                createServer(true, config,
+                        PagingTest.PAGE_SIZE,
+                        PagingTest.PAGE_MAX,
+                        new HashMap<String, AddressSettings>());
+
+        server.start();
+
+        final int numberOfMessages = 5000;
+
+        locator = createInVMNonHALocator();
+
+        locator.setBlockOnNonDurableSend(true);
+        locator.setBlockOnDurableSend(true);
+        locator.setBlockOnAcknowledge(true);
+
+        ClientSessionFactory sf = locator.createSessionFactory();
+
+        ClientSession session = sf.createSession(false, false, false);
+
+        session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+        ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+        ClientMessage message = null;
+
+        final int MESSAGE_SIZE = 1024;
+
+        byte[] body = new byte[MESSAGE_SIZE];
+
+        ByteBuffer bb = ByteBuffer.wrap(body);
+
+        for (int j = 1; j <= MESSAGE_SIZE; j++)
+        {
+            bb.put(getSamplebyte(j));
+        }
+
+        for (int i = 0; i < numberOfMessages; i++)
+        {
+            message = session.createMessage(true);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            producer.send(message);
+            if (i % 1000 == 0)
+            {
+                session.commit();
+            }
+        }
+        session.commit();
+        producer.close();
+        session.close();
+        //System.out.println("Just sent " + numberOfMessages + " messages.");
+
+        session = sf.createSession(false, false, false);
+        producer = session.createProducer(PagingTest.ADDRESS);
+        producer.send(session.createMessage(true));
+        session.rollback();
+        producer.close();
+        session.close();
+        //System.out.println("Just sent (and rolled-back) 1 message.");
+
+        session = sf.createSession(false, false, false);
+        producer = session.createProducer(PagingTest.ADDRESS);
+
+        for (int i = 0; i < numberOfMessages; i++)
+        {
+            message = session.createMessage(true);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            producer.send(message);
+            if (i % 1000 == 0)
+            {
+                session.commit();
+            }
+        }
+        session.commit();
+        producer.close();
+        session.close();
+        //System.out.println("Just sent " + numberOfMessages + " messages.");
+
+        Queue queue = server.locateQueue(PagingTest.ADDRESS);
+
+        session = sf.createSession(false, false, false);
+
+        session.start();
+
+        ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+        ClientMessage msg = null;
+
+        assertEquals(numberOfMessages * 2, queue.getMessageCount());
+        for (int i = 0; i < numberOfMessages * 2; i++)
+        {
+            msg = consumer.receive(1000);
+            assertNotNull(msg);
+            msg.acknowledge();
+            //System.out.println("ack " + i);
+
+            if (i % 500 == 0)
+            {
+                session.commit();
+            }
+        }
+        session.commit();
+        consumer.close();
+        session.close();
+
+        sf.close();
+
+        locator.close();
+
+        assertEquals(0, queue.getMessageCount());
+
+        long timeout = System.currentTimeMillis() + 10000;
+        while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
+        {
+            Thread.sleep(100);
+        }
+        assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
+    }
+
    public void testPreparePersistent() throws Exception
    {
       clearData();



More information about the hornetq-commits mailing list