[hornetq-commits] JBoss hornetq SVN: r12141 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Feb 17 13:45:26 EST 2012


Author: jbertram
Date: 2012-02-17 13:45:25 -0500 (Fri, 17 Feb 2012)
New Revision: 12141

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
Log:
[HORNETQ-859] Page files not deleted on rollback

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java	2012-02-17 17:29:49 UTC (rev 12140)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java	2012-02-17 18:45:25 UTC (rev 12141)
@@ -1025,7 +1025,7 @@
 
       public void afterRollback(final Transaction tx)
       {
-         if (tx.getState() == State.PREPARED && pageTransaction != null)
+         if (pageTransaction != null)
          {
             pageTransaction.rollback();
          }

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java	2012-02-17 17:29:49 UTC (rev 12140)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java	2012-02-17 18:45:25 UTC (rev 12141)
@@ -119,6 +119,138 @@
       locator = createInVMNonHALocator();
    }
 
+    public void testPageCleanup() throws Exception
+    {
+        clearData();
+
+        Configuration config = createDefaultConfig();
+
+        config.setJournalSyncNonTransactional(false);
+
+        server =
+                createServer(true, config,
+                        PagingTest.PAGE_SIZE,
+                        PagingTest.PAGE_MAX,
+                        new HashMap<String, AddressSettings>());
+
+        server.start();
+
+        final int numberOfMessages = 5000;
+
+        locator = createInVMNonHALocator();
+
+        locator.setBlockOnNonDurableSend(true);
+        locator.setBlockOnDurableSend(true);
+        locator.setBlockOnAcknowledge(true);
+
+        sf = createSessionFactory(locator);
+
+        ClientSession session = sf.createSession(false, false, false);
+
+        session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+        ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+        ClientMessage message = null;
+
+        byte[] body = new byte[MESSAGE_SIZE];
+
+        ByteBuffer bb = ByteBuffer.wrap(body);
+
+        for (int j = 1; j <= MESSAGE_SIZE; j++)
+        {
+            bb.put(getSamplebyte(j));
+        }
+
+        for (int i = 0; i < numberOfMessages; i++)
+        {
+            message = session.createMessage(true);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            producer.send(message);
+            if (i % 1000 == 0)
+            {
+                session.commit();
+            }
+        }
+        session.commit();
+        producer.close();
+        session.close();
+        //System.out.println("Just sent " + numberOfMessages + " messages.");
+
+        session = sf.createSession(false, false, false);
+        producer = session.createProducer(PagingTest.ADDRESS);
+        producer.send(session.createMessage(true));
+        session.rollback();
+        producer.close();
+        session.close();
+        //System.out.println("Just sent (and rolled-back) 1 message.");
+
+        session = sf.createSession(false, false, false);
+        producer = session.createProducer(PagingTest.ADDRESS);
+
+        for (int i = 0; i < numberOfMessages; i++)
+        {
+            message = session.createMessage(true);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            producer.send(message);
+            if (i % 1000 == 0)
+            {
+                session.commit();
+            }
+        }
+        session.commit();
+        producer.close();
+        session.close();
+        //System.out.println("Just sent " + numberOfMessages + " messages.");
+
+        Queue queue = server.locateQueue(PagingTest.ADDRESS);
+
+        session = sf.createSession(false, false, false);
+
+        session.start();
+
+        ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+        ClientMessage msg = null;
+
+        assertEquals(numberOfMessages * 2, queue.getMessageCount());
+        for (int i = 0; i < numberOfMessages * 2; i++)
+        {
+            msg = consumer.receive(1000);
+            assertNotNull(msg);
+            msg.acknowledge();
+            //System.out.println("ack " + i);
+
+            if (i % 500 == 0)
+            {
+                session.commit();
+            }
+        }
+        session.commit();
+        consumer.close();
+        session.close();
+
+        sf.close();
+
+        locator.close();
+
+        assertEquals(0, queue.getMessageCount());
+
+        long timeout = System.currentTimeMillis() + 10000;
+        while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
+        {
+            Thread.sleep(100);
+        }
+        assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
+    }
+
    public void testPreparePersistent() throws Exception
    {
       clearData();



More information about the hornetq-commits mailing list