Author: clebert.suconic(a)jboss.com
Date: 2011-04-05 12:17:19 -0400 (Tue, 05 Apr 2011)
New Revision: 10455
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/Page.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCache.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
JBPAPP-6237 - delaying delete of large messages when sent through paging
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/Page.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/Page.java 2011-04-05 14:47:00
UTC (rev 10454)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/Page.java 2011-04-05 16:17:19
UTC (rev 10455)
@@ -16,6 +16,7 @@
import java.util.List;
import org.hornetq.core.paging.cursor.LivePageCache;
+import org.hornetq.core.persistence.StorageManager;
/**
*
@@ -29,7 +30,7 @@
void write(PagedMessage message) throws Exception;
- List<PagedMessage> read() throws Exception;
+ List<PagedMessage> read(StorageManager storage) throws Exception;
void setLiveCache(LivePageCache pageCache);
@@ -43,5 +44,5 @@
void close() throws Exception;
- boolean delete() throws Exception;
+ boolean delete(PagedMessage[] messages) throws Exception;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java 2011-04-05
14:47:00 UTC (rev 10454)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java 2011-04-05
16:17:19 UTC (rev 10455)
@@ -108,7 +108,7 @@
System.out.println("******* Page " + pgid);
Page page = pgStore.createPage(pgid);
page.open();
- List<PagedMessage> msgs = page.read();
+ List<PagedMessage> msgs = page.read(sm);
page.close();
int msgID = 0;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCache.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCache.java 2011-04-05
14:47:00 UTC (rev 10454)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCache.java 2011-04-05
16:17:19 UTC (rev 10455)
@@ -34,6 +34,8 @@
void setMessages(PagedMessage[] messages);
+ PagedMessage[] getMessages();
+
/**
* If this cache is still being updated
* @return
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2011-04-05
14:47:00 UTC (rev 10454)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2011-04-05
16:17:19 UTC (rev 10455)
@@ -143,7 +143,15 @@
this.isLive = false;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCache#getMessages()
+ */
+ public PagedMessage[] getMessages()
+ {
+ return messages.toArray(new PagedMessage[messages.size()]);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2011-04-05
14:47:00 UTC (rev 10454)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2011-04-05
16:17:19 UTC (rev 10455)
@@ -133,6 +133,14 @@
return "PageCacheImpl::page=" + page.getPageId() + "
numberOfMessages = " + messages.length;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCache#getMessages()
+ */
+ public PagedMessage[] getMessages()
+ {
+ return messages;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-04-05
14:47:00 UTC (rev 10454)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-04-05
16:17:19 UTC (rev 10455)
@@ -143,6 +143,74 @@
return getPageCache(pos.getPageNr());
}
+ public PageCache getPageCache(final long pageId)
+ {
+ try
+ {
+ boolean needToRead = false;
+ PageCache cache = null;
+ synchronized (softCache)
+ {
+ if (pageId > pagingStore.getCurrentWritingPage())
+ {
+ return null;
+ }
+
+ cache = softCache.get(pageId);
+ if (cache == null)
+ {
+ if (!pagingStore.checkPage((int)pageId))
+ {
+ return null;
+ }
+
+ cache = createPageCache(pageId);
+ needToRead = true;
+ // anyone reading from this cache will have to wait reading to finish
first
+ // we also want only one thread reading this cache
+ cache.lock();
+ softCache.put(pageId, cache);
+ }
+ }
+
+ // Reading is done outside of the synchronized block, however
+ // the page stays locked until the entire reading is finished
+ if (needToRead)
+ {
+ Page page = null;
+ try
+ {
+ page = pagingStore.createPage((int)pageId);
+
+ page.open();
+
+ List<PagedMessage> pgdMessages = page.read(storageManager);
+ cache.setMessages(pgdMessages.toArray(new
PagedMessage[pgdMessages.size()]));
+ }
+ finally
+ {
+ try
+ {
+ if (page != null)
+ {
+ page.close();
+ }
+ }
+ catch (Throwable ignored)
+ {
+ }
+ cache.unlock();
+ }
+ }
+
+ return cache;
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Couldn't complete paging due to an IO
Exception on Paging - " + e.getMessage(), e);
+ }
+ }
+
public void addPageCache(PageCache cache)
{
synchronized (softCache)
@@ -337,9 +405,31 @@
{
for (Page depagedPage : depagedPages)
{
- depagedPage.delete();
+ PageCache cache;
+ PagedMessage[] pgdMessages;
synchronized (softCache)
{
+ cache = softCache.remove((long)depagedPage.getPageId());
+ }
+
+ if (cache == null)
+ {
+ // The page is not on cache any more
+ // We need to read the page-file before deleting it
+ // to make sure we remove any large-messages pending
+ depagedPage.open();
+ List<PagedMessage> pgdMessagesList =
depagedPage.read(storageManager);
+ depagedPage.close();
+ pgdMessages = pgdMessagesList.toArray(new
PagedMessage[pgdMessagesList.size()]);
+ }
+ else
+ {
+ pgdMessages = cache.getMessages();
+ }
+
+ depagedPage.delete(pgdMessages);
+ synchronized (softCache)
+ {
softCache.remove((long)depagedPage.getPageId());
}
}
@@ -422,79 +512,6 @@
}
- private PageCache getPageCache(final long pageId)
- {
- try
- {
- boolean needToRead = false;
- PageCache cache = null;
- synchronized (softCache)
- {
- if (pageId > pagingStore.getCurrentWritingPage())
- {
- return null;
- }
-
- cache = softCache.get(pageId);
- if (cache == null)
- {
- if (!pagingStore.checkPage((int)pageId))
- {
- return null;
- }
-
- cache = createPageCache(pageId);
- needToRead = true;
- // anyone reading from this cache will have to wait reading to finish
first
- // we also want only one thread reading this cache
- cache.lock();
- softCache.put(pageId, cache);
- }
- }
-
- // Reading is done outside of the synchronized block, however
- // the page stays locked until the entire reading is finished
- if (needToRead)
- {
- Page page = null;
- try
- {
- page = pagingStore.createPage((int)pageId);
-
- page.open();
-
- List<PagedMessage> pgdMessages = page.read();
-
- for (PagedMessage pdgMessage : pgdMessages)
- {
- pdgMessage.initMessage(storageManager);
- }
- cache.setMessages(pgdMessages.toArray(new
PagedMessage[pgdMessages.size()]));
- }
- finally
- {
- try
- {
- if (page != null)
- {
- page.close();
- }
- }
- catch (Throwable ignored)
- {
- }
- cache.unlock();
- }
- }
-
- return cache;
- }
- catch (Exception e)
- {
- throw new RuntimeException("Couldn't complete paging due to an IO
Exception on Paging - " + e.getMessage(), e);
- }
- }
-
// Inner classes -------------------------------------------------
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageImpl.java 2011-04-05
14:47:00 UTC (rev 10454)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageImpl.java 2011-04-05
16:17:19 UTC (rev 10455)
@@ -28,6 +28,7 @@
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.LivePageCache;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.utils.DataConstants;
/**
@@ -101,7 +102,7 @@
this.pageCache = pageCache;
}
- public List<PagedMessage> read() throws Exception
+ public List<PagedMessage> read(StorageManager storage) throws Exception
{
ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
@@ -140,6 +141,7 @@
// constraint was already checked
throw new IllegalStateException("Internal error, it wasn't
possible to locate END_BYTE " + b);
}
+ msg.initMessage(storage);
messages.add(msg);
}
else
@@ -218,13 +220,29 @@
file.close();
}
- public boolean delete() throws Exception
+ public boolean delete(final PagedMessage[] messages) throws Exception
{
if (storageManager != null)
{
storageManager.pageDeleted(storeName, pageId);
}
+ if (messages != null)
+ {
+ for (PagedMessage msg : messages)
+ {
+ if (msg.getMessage().isLargeMessage())
+ {
+ LargeServerMessage lmsg = (LargeServerMessage)msg.getMessage();
+
+ // Remember, cannot call delete directly here
+ // Because the large-message may be linked to another message
+ // or it may still being delivered even though it has been acked already
+ lmsg.decrementDelayDeletionCount();
+ }
+ }
+ }
+
try
{
if (suspiciousRecords)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2011-04-05
14:47:00 UTC (rev 10454)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2011-04-05
16:17:19 UTC (rev 10455)
@@ -79,9 +79,11 @@
{
if (largeMessageLazyData != null)
{
- message = storage.createLargeMessage();
+ LargeServerMessage lgMessage = storage.createLargeMessage();
+ message = lgMessage;
HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(largeMessageLazyData);
message.decodeHeadersAndProperties(buffer);
+ lgMessage.incrementDelayDeletionCount();
largeMessageLazyData = null;
}
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-04-05
14:47:00 UTC (rev 10454)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-04-05
16:17:19 UTC (rev 10455)
@@ -462,13 +462,12 @@
currentPage = createPage(currentPageId);
currentPage.open();
- List<PagedMessage> messages = currentPage.read();
+ List<PagedMessage> messages = currentPage.read(storageManager);
LivePageCache pageCache = new LivePageCacheImpl(currentPage);
for (PagedMessage msg : messages)
{
- msg.initMessage(storageManager);
pageCache.addLiveMessage(msg);
}
@@ -646,7 +645,7 @@
{
stopPaging();
returnPage.open();
- returnPage.delete();
+ returnPage.delete(null);
// This will trigger this address to exit the page mode,
// and this will make HornetQ start using the journal again
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-04-05
14:47:00 UTC (rev 10454)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-04-05
16:17:19 UTC (rev 10455)
@@ -533,7 +533,7 @@
{
if (deletePages)
{
- page.delete();
+ page.delete(null);
}
}
else
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-04-05
14:47:00 UTC (rev 10454)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-04-05
16:17:19 UTC (rev 10455)
@@ -13,6 +13,7 @@
package org.hornetq.tests.integration.client;
+import java.io.ByteArrayOutputStream;
import java.util.HashMap;
import javax.transaction.xa.XAResource;
@@ -139,17 +140,16 @@
}
}
-
public void testLargeBufferTransacted() throws Exception
{
doTestLargeBuffer(true);
}
-
+
public void testLargeBufferNotTransacted() throws Exception
{
doTestLargeBuffer(false);
}
-
+
public void doTestLargeBuffer(boolean transacted) throws Exception
{
final int journalsize = 100 * 1024;
@@ -162,10 +162,10 @@
{
Configuration config = createDefaultConfig(isNetty());
config.setJournalFileSize(journalsize);
-
+
config.setJournalBufferSize_AIO(10 * 1024);
config.setJournalBufferSize_NIO(10 * 1024);
-
+
server = createServer(true, config);
server.start();
@@ -179,11 +179,10 @@
ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
Message clientFile = session.createMessage(true);
- for (int i = 0 ; i < messageSize; i++)
+ for (int i = 0; i < messageSize; i++)
{
clientFile.getBodyBuffer().writeByte(getSamplebyte(i));
}
-
producer.send(clientFile);
@@ -197,26 +196,24 @@
ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
ClientMessage msg1 = consumer.receive(5000);
assertNotNull(msg1);
-
+
Assert.assertNotNull(msg1);
-
- for (int i = 0 ; i < messageSize; i++)
+
+ for (int i = 0; i < messageSize; i++)
{
- //System.out.print(msg1.getBodyBuffer().readByte() + " ");
- //if (i % 100 == 0) System.out.println();
- assertEquals("position = " + i, getSamplebyte(i),
msg1.getBodyBuffer().readByte());
+ // System.out.print(msg1.getBodyBuffer().readByte() + " ");
+ // if (i % 100 == 0) System.out.println();
+ assertEquals("position = " + i, getSamplebyte(i),
msg1.getBodyBuffer().readByte());
}
-
+
msg1.acknowledge();
-
+
consumer.close();
-
-
+
if (transacted)
{
session.commit();
}
-
session.close();
@@ -880,7 +877,6 @@
producer2.send(msg1);
-
session.commit();
ClientMessage msg2 = consumer2.receive(10000);
@@ -939,9 +935,8 @@
server.start();
-
locator.setMinLargeMessageSize(200);
-
+
locator.setCacheLargeMessagesClient(true);
ClientSessionFactory sf = locator.createSessionFactory();
@@ -968,19 +963,19 @@
session.commit();
compareString(messageSize, msgReceived);
-
+
msgReceived.getBodyBuffer().readerIndex(0);
-
+
producer.send(msgReceived);
session.commit();
-
+
ClientMessage msgReceived2 = consumer.receive(10000);
msgReceived2.acknowledge();
compareString(messageSize, msgReceived2);
-
+
session.commit();
session.close();
@@ -1016,7 +1011,7 @@
assertNotNull(msg);
for (long i = 0; i < messageSize; i++)
{
- Assert.assertEquals("position " + i, UnitTestCase.getSamplebyte(i),
msg.getBodyBuffer().readByte());
+ Assert.assertEquals("position " + i, UnitTestCase.getSamplebyte(i),
msg.getBodyBuffer().readByte());
}
}
@@ -2365,7 +2360,6 @@
server.start();
-
locator.setMinLargeMessageSize(1024);
locator.setConsumerWindowSize(1024 * 1024);
@@ -2469,7 +2463,6 @@
server.start();
-
locator.setMinLargeMessageSize(100 * 1024);
ClientSessionFactory sf = locator.createSessionFactory();
@@ -2629,18 +2622,17 @@
try
{
LargeServerMessageImpl fileMessage = new
LargeServerMessageImpl((JournalStorageManager)server.getStorageManager());
-
+
fileMessage.setMessageID(1005);
for (int i = 0; i < LARGE_MESSAGE_SIZE; i++)
{
fileMessage.addBytes(new byte[] { UnitTestCase.getSamplebyte(i) });
}
-
+
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, LARGE_MESSAGE_SIZE);
-
fileMessage.releaseResources();
session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
@@ -2821,6 +2813,158 @@
}
+ public void testPageOnLargeMessageMultipleQueues() throws Exception
+ {
+ Configuration config = createDefaultConfig(isNetty());
+
+ final int PAGE_MAX = 20 * 1024;
+
+ final int PAGE_SIZE = 10 * 1024;
+
+ HashMap<String, AddressSettings> map = new HashMap<String,
AddressSettings>();
+
+ AddressSettings value = new AddressSettings();
+ map.put(LargeMessageTest.ADDRESS.toString(), value);
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+ server.start();
+
+ final int numberOfBytes = 1024;
+
+ final int numberOfBytesBigMessage = 400000;
+
+ try
+ {
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false,
0);
+
+ session.createQueue(LargeMessageTest.ADDRESS,
LargeMessageTest.ADDRESS.concat("-0"), null, true);
+ session.createQueue(LargeMessageTest.ADDRESS,
LargeMessageTest.ADDRESS.concat("-1"), null, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ for (int i = 0; i < 100; i++)
+ {
+ message = session.createMessage(true);
+
+ message.getBodyBuffer().writerIndex(0);
+
+ message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
+
+ for (int j = 1; j <= numberOfBytes; j++)
+ {
+ message.getBodyBuffer().writeInt(j);
+ }
+
+ producer.send(message);
+ }
+
+ ClientMessage clientFile = createLargeClientMessage(session,
numberOfBytesBigMessage);
+ clientFile.putBooleanProperty("TestLarge", true);
+ producer.send(clientFile);
+
+ for (int i = 0; i < 100; i++)
+ {
+ message = session.createMessage(true);
+
+ message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
+
+ producer.send(message);
+ }
+
+ session.close();
+
+ server.stop();
+
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ for (int ad = 0; ad < 2; ad++)
+ {
+ session = sf.createSession(false, false, false);
+
+ ClientConsumer consumer =
session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
+
+ session.start();
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message2 =
consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ Assert.assertNotNull(message2);
+ }
+
+ session.commit();
+
+ for (int i = 0; i < 5; i++)
+ {
+ ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
+
+ assertTrue(messageLarge.getBooleanProperty("TestLarge"));
+
+ Assert.assertNotNull(messageLarge);
+
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ messageLarge.acknowledge();
+ messageLarge.saveToOutputStream(bout);
+ byte[] body = bout.toByteArray();
+ assertEquals(numberOfBytesBigMessage, body.length);
+ for (int bi = 0; bi < body.length; bi++)
+ {
+ assertEquals(getSamplebyte(bi), body[bi]);
+ }
+
+ if (i < 4)
+ session.rollback();
+ else
+ session.commit();
+ }
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message2 =
consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ Assert.assertNotNull(message2);
+ }
+
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+
+ }
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2011-04-05
14:47:00 UTC (rev 10454)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2011-04-05
16:17:19 UTC (rev 10455)
@@ -98,7 +98,7 @@
file.open();
impl = new PageImpl(new SimpleString("something"), new
NullStorageManager(), factory, file, 10);
- List<PagedMessage> msgs = impl.read();
+ List<PagedMessage> msgs = impl.read(new NullStorageManager());
Assert.assertEquals(numberOfElements, msgs.size());
@@ -115,7 +115,7 @@
.array());
}
- impl.delete();
+ impl.delete(null);
Assert.assertEquals(0, factory.listFiles(".page").size());
@@ -170,7 +170,7 @@
file.open();
impl = new PageImpl(new SimpleString("something"), new
NullStorageManager(), factory, file, 10);
- List<PagedMessage> msgs = impl.read();
+ List<PagedMessage> msgs = impl.read(new NullStorageManager());
Assert.assertEquals(numberOfElements, msgs.size());
@@ -187,7 +187,7 @@
.array());
}
- impl.delete();
+ impl.delete(null);
Assert.assertEquals(0, factory.listFiles("page").size());
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2011-04-05
14:47:00 UTC (rev 10454)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2011-04-05
16:17:19 UTC (rev 10455)
@@ -93,7 +93,7 @@
page.open();
- List<PagedMessage> msgs = page.read();
+ List<PagedMessage> msgs = page.read(new NullStorageManager());
page.close();
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-04-05
14:47:00 UTC (rev 10454)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-04-05
16:17:19 UTC (rev 10455)
@@ -13,7 +13,6 @@
package org.hornetq.tests.unit.core.paging.impl;
-import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -58,6 +57,7 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
+import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.replication.ReplicationManager;
@@ -286,7 +286,7 @@
page.open();
- List<PagedMessage> msg = page.read();
+ List<PagedMessage> msg = page.read(new NullStorageManager());
Assert.assertEquals(numMessages, msg.size());
Assert.assertEquals(1, storeImpl.getNumberOfPages());
@@ -374,7 +374,7 @@
page.open();
- List<PagedMessage> msg = page.read();
+ List<PagedMessage> msg = page.read(new NullStorageManager());
page.close();
@@ -399,9 +399,9 @@
newPage.open();
- Assert.assertEquals(1, newPage.read().size());
+ Assert.assertEquals(1, newPage.read(new NullStorageManager()).size());
- newPage.delete();
+ newPage.delete(null);
Assert.assertEquals(1, storeImpl.getNumberOfPages());
@@ -421,7 +421,7 @@
page.open();
- List<PagedMessage> msgs = page.read();
+ List<PagedMessage> msgs = page.read(new NullStorageManager());
Assert.assertEquals(1, msgs.size());
@@ -603,7 +603,7 @@
for (Page page : readPages)
{
page.open();
- List<PagedMessage> msgs = page.read();
+ List<PagedMessage> msgs = page.read(new NullStorageManager());
page.close();
for (PagedMessage msg : msgs)
@@ -678,7 +678,7 @@
page.open();
- List<PagedMessage> msgs = page.read();
+ List<PagedMessage> msgs = page.read(new NullStorageManager());
page.close();
@@ -696,7 +696,7 @@
}
lastPage.open();
- List<PagedMessage> lastMessages = lastPage.read();
+ List<PagedMessage> lastMessages = lastPage.read(new NullStorageManager());
lastPage.close();
Assert.assertEquals(1, lastMessages.size());
@@ -856,7 +856,7 @@
if (page != null)
{
page.open();
- List<PagedMessage> messages = page.read();
+ List<PagedMessage> messages = page.read(new
NullStorageManager());
for (PagedMessage pgmsg : messages)
{
@@ -868,7 +868,7 @@
}
page.close();
- page.delete();
+ page.delete(null);
}
else
{