[hornetq-commits] JBoss hornetq SVN: r10243 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/cursor/impl and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Feb 22 20:30:43 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-02-22 20:30:42 -0500 (Tue, 22 Feb 2011)
New Revision: 10243
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
JBPAPP-5973 - fixing empty files after paging
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingStore.java 2011-02-23 00:58:50 UTC (rev 10242)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingStore.java 2011-02-23 01:30:42 UTC (rev 10243)
@@ -72,6 +72,8 @@
Page createPage(final int page) throws Exception;
+ boolean checkPage(final int page) throws Exception;
+
PagingManager getPagingManager();
PageCursorProvider getCursorProvier();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-02-23 00:58:50 UTC (rev 10242)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-02-23 01:30:42 UTC (rev 10243)
@@ -438,6 +438,11 @@
cache = softCache.get(pageId);
if (cache == null)
{
+ if (!pagingStore.checkPage((int)pageId))
+ {
+ return null;
+ }
+
cache = createPageCache(pageId);
needToRead = true;
// anyone reading from this cache will have to wait reading to finish first
@@ -464,9 +469,7 @@
{
pdgMessage.initMessage(storageManager);
}
-
cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()]));
-
}
finally
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-02-23 00:58:50 UTC (rev 10242)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-02-23 01:30:42 UTC (rev 10243)
@@ -319,13 +319,8 @@
PageCache cache = cursorProvider.getPageCache(pos);
- if (cache == null)
+ if (cache == null || (!cache.isLive() && retPos.getMessageNr() >= cache.getNumberOfMessages()))
{
- return null;
- }
-
- if (!cache.isLive() && retPos.getMessageNr() >= cache.getNumberOfMessages())
- {
retPos = pos.nextPage();
cache = cursorProvider.getPageCache(retPos);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-02-23 00:58:50 UTC (rev 10242)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-02-23 01:30:42 UTC (rev 10243)
@@ -13,6 +13,7 @@
package org.hornetq.core.paging.impl;
+import java.io.File;
import java.text.DecimalFormat;
import java.util.Collections;
import java.util.HashSet;
@@ -560,6 +561,13 @@
{
return currentPage;
}
+
+ public boolean checkPage(final int pageNumber)
+ {
+ String fileName = createFileName(pageNumber);
+ SequentialFile file = fileFactory.createSequentialFile(fileName, 1);
+ return file.exists();
+ }
public Page createPage(final int pageNumber) throws Exception
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-02-23 00:58:50 UTC (rev 10242)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-02-23 01:30:42 UTC (rev 10243)
@@ -29,6 +29,7 @@
import junit.framework.AssertionFailedError;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientConsumer;
@@ -454,7 +455,6 @@
try
{
server.stop();
- // System.exit(-1);
}
catch (Throwable ignored)
{
@@ -2787,7 +2787,154 @@
}
}
}
+
+
+ public void testPageAndDepageRapidly() throws Exception
+ {
+ boolean persistentMessages = true;
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+ config.setJournalFileSize(10 * 1024 * 1024);
+
+ HornetQServer server = createServer(true,
+ config,
+ 512 * 1024,
+ 1024 * 1024,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 51527;
+
+ final int numberOfMessages = 200;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ final ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ Thread consumeThread = new Thread()
+ {
+ public void run()
+ {
+ ClientSession sessionConsumer = null;
+ try
+ {
+ sessionConsumer = sf.createSession(false, false);
+ sessionConsumer.start();
+
+ ClientConsumer cons = sessionConsumer.createConsumer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = cons.receive(PagingTest.RECEIVE_TIMEOUT);
+ System.out.println("Message " + i + " consumed");
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ if (i % 20 == 0)
+ {
+ System.out.println("Commit consumer");
+ sessionConsumer.commit();
+ }
+ }
+ sessionConsumer.commit();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ try
+ {
+ sessionConsumer.close();
+ }
+ catch (HornetQException e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+
+ }
+ };
+
+ consumeThread.start();
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(persistentMessages);
+
+ System.out.println("Message " + i + " sent");
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+
+ Thread.sleep(50);
+ }
+
+
+ consumeThread.join();
+
+ long timeout = System.currentTimeMillis() + 5000;
+
+ while (System.currentTimeMillis() < timeout && (server.getPagingManager().getPageStore(ADDRESS).isPaging() || server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages() != 1))
+ {
+ Thread.sleep(1);
+ }
+
+ // It's async, so need to wait a bit for it happening
+ assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+ assertEquals(1, server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages());
+
+ sf.close();
+
+ locator.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the hornetq-commits
mailing list