Author: clebert.suconic(a)jboss.com
Date: 2010-10-04 12:12:30 -0400 (Mon, 04 Oct 2010)
New Revision: 9742
Added:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Removed:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
Log:
PageCursor implementation
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-09-30
23:11:44 UTC (rev 9741)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-04
16:12:30 UTC (rev 9742)
@@ -40,6 +40,10 @@
SimpleString getStoreName();
AddressFullMessagePolicy getAddressFullMessagePolicy();
+
+ long getFirstPage();
+
+ long getTopPage();
long getPageSizeBytes();
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-09-30
23:11:44 UTC (rev 9741)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-04
16:12:30 UTC (rev 9742)
@@ -14,8 +14,7 @@
package org.hornetq.core.paging.cursor;
import org.hornetq.api.core.Pair;
-import org.hornetq.core.paging.PagedMessage;
-import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.server.ServerMessage;
/**
* A PageCursor
@@ -27,11 +26,11 @@
public interface PageCursor
{
- Pair<PagePositionImpl, PagedMessage> moveNext();
+ Pair<PagePosition, ServerMessage> moveNext() throws Exception;
- PagePositionImpl getFirstPosition();
+ PagePosition getFirstPosition();
- void ack(PagePositionImpl position);
+ void ack(PagePosition position);
- void returnElement(PagePositionImpl position);
+ void returnElement(PagePosition position);
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-09-30
23:11:44 UTC (rev 9741)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-04
16:12:30 UTC (rev 9742)
@@ -13,8 +13,10 @@
package org.hornetq.core.paging.cursor;
+import org.hornetq.api.core.Pair;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.server.ServerMessage;
/**
* The provider of Cursor for a given Address
@@ -29,6 +31,7 @@
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -42,9 +45,9 @@
PageCursor createCursor();
- PageCursor recoverCursor(PagePositionImpl position);
+ //PageCursor recoverCursor(PagePosition position);
- PagePositionImpl getAfter(PagePositionImpl pos);
+ Pair<PagePosition,ServerMessage> getAfter(PagePosition pos) throws Exception;
// Package protected ---------------------------------------------
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java 2010-09-30
23:11:44 UTC (rev 9741)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java 2010-10-04
16:12:30 UTC (rev 9742)
@@ -25,10 +25,15 @@
long getRecordID();
+ // TODO: this belongs somewhere else
void setRecordID(long recordID);
long getPageNr();
- long getMessageNr();
+ int getMessageNr();
+ PagePosition nextMessage();
+
+ PagePosition nextPage();
+
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-09-30
23:11:44 UTC (rev 9741)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-04
16:12:30 UTC (rev 9742)
@@ -17,7 +17,10 @@
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
+import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.StorageCursor;
+import org.hornetq.core.server.ServerMessage;
/**
* A PageCursorImpl
@@ -29,69 +32,99 @@
*/
public class PageCursorImpl implements PageCursor
{
-
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
private StorageCursor store;
-
+
private PagingStore pageStore;
- public PageCursorImpl(PagingStore pageStore, StorageCursor store)
+ private final PageCursorProvider cursorProvider;
+
+ private volatile PagePosition lastPosition;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public PageCursorImpl(PageCursorProvider cursorProvider, PagingStore pageStore,
StorageCursor store)
{
this.pageStore = pageStore;
this.store = store;
+ this.cursorProvider = cursorProvider;
}
+ // Public --------------------------------------------------------
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
*/
- public Pair<PagePositionImpl, PagedMessage> moveNext()
+ public synchronized Pair<PagePosition, ServerMessage> moveNext() throws
Exception
{
- // TODO Auto-generated method stub
- return null;
+ if (lastPosition == null)
+ {
+ lastPosition = recoverLastPosition();
+ }
+
+ Pair<PagePosition,ServerMessage> message = null;
+ do
+ {
+ message = cursorProvider.getAfter(lastPosition);
+ if (message != null)
+ {
+ lastPosition = message.a;
+ }
+ }
+ while (message != null && !match(message.b));
+
+ return message;
}
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
*/
- public void ack(PagePositionImpl position)
+ public void ack(PagePosition position)
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
*/
- public void returnElement(PagePositionImpl position)
+ public void returnElement(PagePosition position)
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#getFirstPosition()
*/
- public PagePositionImpl getFirstPosition()
+ public PagePosition getFirstPosition()
{
// TODO Auto-generated method stub
return null;
}
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+
+ protected boolean match(ServerMessage message)
+ {
+ return true;
+ }
// Private -------------------------------------------------------
+ private PagePosition recoverLastPosition()
+ {
+ long firstPage = pageStore.getFirstPage();
+ return new PagePositionImpl(firstPage, -1);
+ }
+
// Inner classes -------------------------------------------------
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-09-30
23:11:44 UTC (rev 9741)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-04
16:12:30 UTC (rev 9742)
@@ -15,12 +15,14 @@
import java.util.List;
+import org.hornetq.api.core.Pair;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursor;
import org.hornetq.core.paging.cursor.PageCursorProvider;
+import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.SoftValueHashMap;
@@ -66,7 +68,7 @@
*/
public PageCursor createCursor()
{
- return null;
+ return new PageCursorImpl(this, pagingStore, null);
}
/* (non-Javadoc)
@@ -80,9 +82,29 @@
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
*/
- public PagePositionImpl getAfter(final PagePositionImpl pos)
+ public Pair<PagePosition, ServerMessage> getAfter(final PagePosition pos) throws
Exception
{
- return null;
+ PagePosition retPos = pos.nextMessage();
+
+ PageCache cache = getPageCache(pos.getPageNr());
+
+ if (retPos.getMessageNr() >= cache.getNumberOfMessages())
+ {
+ retPos = pos.nextPage();
+
+ cache = getPageCache(retPos.getPageNr());
+ if (cache == null)
+ {
+ return null;
+ }
+
+ if (retPos.getMessageNr() >= cache.getNumberOfMessages())
+ {
+ return null;
+ }
+ }
+
+ return new Pair<PagePosition, ServerMessage>(retPos,
cache.getMessage(retPos.getMessageNr()));
}
public PageCache getPageCache(final long pageId) throws Exception
@@ -91,6 +113,11 @@
PageCacheImpl cache = null;
synchronized (this)
{
+ if (pageId > pagingStore.getNumberOfPages())
+ {
+ return null;
+ }
+
cache = softCache.get(pageId);
if (cache == null)
{
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-09-30
23:11:44 UTC (rev 9741)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-04
16:12:30 UTC (rev 9742)
@@ -26,18 +26,16 @@
{
private long pageNr;
- private long messageNr;
+ private int messageNr;
/** ID used for storage */
private long recordID;
-
-
/**
* @param pageNr
* @param messageNr
*/
- public PagePositionImpl(long pageNr, long messageNr)
+ public PagePositionImpl(long pageNr, int messageNr)
{
super();
this.pageNr = pageNr;
@@ -71,7 +69,7 @@
/**
* @return the messageNr
*/
- public long getMessageNr()
+ public int getMessageNr()
{
return messageNr;
}
@@ -102,7 +100,17 @@
return 0;
}
}
-
+
+ public PagePosition nextMessage()
+ {
+ return new PagePositionImpl(this.pageNr, this.messageNr + 1);
+ }
+
+ public PagePosition nextPage()
+ {
+ return new PagePositionImpl(this.pageNr + 1, 0);
+ }
+
public boolean isNextSequenceOf(PagePosition pos)
{
return this.pageNr == pos.getPageNr() && this.getRecordID() -
pos.getRecordID() == 1;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-09-30
23:11:44 UTC (rev 9741)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-04
16:12:30 UTC (rev 9742)
@@ -203,6 +203,16 @@
// Public --------------------------------------------------------
// PagingStore implementation ------------------------------------
+
+ public long getFirstPage()
+ {
+ return firstPageId;
+ }
+
+ public long getTopPage()
+ {
+ return currentPageId;
+ }
public SimpleString getAddress()
{
Deleted:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java 2010-09-30
23:11:44 UTC (rev 9741)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java 2010-10-04
16:12:30 UTC (rev 9742)
@@ -1,136 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.paging;
-
-import java.util.HashMap;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
-import org.hornetq.core.paging.impl.PagingStoreImpl;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.impl.ServerMessageImpl;
-import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.tests.util.ServiceTestBase;
-
-/**
- * A PageCacheTest
- *
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
- *
- *
- */
-public class PageCacheTest extends ServiceTestBase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private SimpleString ADDRESS = new SimpleString("test-add");
-
- private HornetQServer server;
-
- private static final int PAGE_MAX = -1;
-
- private static final int PAGE_SIZE = 10 * 1024 * 1024;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testReadCache() throws Exception
- {
-
- PagingStoreImpl pageStore =
(PagingStoreImpl)server.getPagingManager().getPageStore(ADDRESS);
-
- StorageManager storageManager = server.getStorageManager();
-
- final int NUM_MESSAGES = 1000;
-
- pageStore.startPaging();
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- if (i % 100 == 0) System.out.println("Paged " + i);
- HornetQBuffer buffer = RandomUtil.randomBuffer(1024*1024, i + 1l);
-
- ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
- msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-
- Assert.assertTrue(pageStore.page(msg));
- }
-
- int numberOfPages = pageStore.getNumberOfPages();
-
- System.out.println("NumberOfPages = " + numberOfPages);
-
- PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(pageStore,
storageManager);
-
- for (int i = 0; i < numberOfPages; i++)
- {
- PageCache cache = cursorProvider.getPageCache(i + 1);
- System.out.println("Page " + i + " had " +
cache.getNumberOfMessages() + " messages");
-
- }
-
- forceGC();
-
- assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-
- System.out.println("Cache size = " + cursorProvider.getCacheSize());
- assertEquals(numberOfPages, pageStore.getNumberOfPages());
-
-
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected void setUp() throws Exception
- {
- super.setUp();
- System.out.println("Tmp:" + getTemporaryDir());
-
- server = createServer(true,
- createDefaultConfig(),
- PAGE_SIZE,
- PAGE_MAX,
- new HashMap<String, AddressSettings>());
-
- server.start();
-
- createQueue(ADDRESS.toString(), ADDRESS.toString());
- }
-
- protected void tearDown() throws Exception
- {
- server.stop();
- super.tearDown();
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Copied:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
(from rev 9741,
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java)
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
(rev 0)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-04
16:12:30 UTC (rev 9742)
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.paging;
+
+import java.util.HashMap;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.paging.cursor.PageCache;
+import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
+import org.hornetq.core.paging.impl.PagingStoreImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A PageCacheTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ *
+ */
+public class PageCursorTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private SimpleString ADDRESS = new SimpleString("test-add");
+
+ private HornetQServer server;
+
+ private static final int PAGE_MAX = -1;
+
+ private static final int PAGE_SIZE = 10 * 1024 * 1024;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testReadCache() throws Exception
+ {
+
+ final int NUM_MESSAGES = 1000;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PageCursorProviderImpl cursorProvider = new
PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager());
+
+ for (int i = 0; i < numberOfPages; i++)
+ {
+ PageCache cache = cursorProvider.getPageCache(i + 1);
+ System.out.println("Page " + i + " had " +
cache.getNumberOfMessages() + " messages");
+
+ }
+
+ forceGC();
+
+ assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+
+ System.out.println("Cache size = " + cursorProvider.getCacheSize());
+ }
+
+
+ public void testSimpleCursor() throws Exception
+ {
+
+ final int NUM_MESSAGES = 1000;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PageCursorProviderImpl cursorProvider = new
PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager());
+
+ PageCursor cursor = cursorProvider.createCursor();
+
+ Pair<PagePosition, ServerMessage> msg;
+
+ int key = 0;
+ while ((msg = cursor.moveNext()) != null)
+ {
+ assertEquals(key++, msg.b.getIntProperty("key").intValue());
+ }
+ assertEquals(NUM_MESSAGES, key);
+
+
+ forceGC();
+
+ assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+
+ }
+
+
+ public void testReadNextPage() throws Exception
+ {
+
+ final int NUM_MESSAGES = 1;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PageCursorProviderImpl cursorProvider = new
PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager());
+
+ PageCache cache = cursorProvider.getPageCache(2);
+
+ assertNull(cache);
+ }
+
+ /**
+ * @param numMessages
+ * @param pageStore
+ * @throws Exception
+ */
+ private int addMessages(final int numMessages, final int messageSize) throws
Exception
+ {
+ PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+ pageStore.startPaging();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ if (i % 100 == 0) System.out.println("Paged " + i);
+ HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+
+ ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+ msg.putIntProperty("key", i);
+
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+ Assert.assertTrue(pageStore.page(msg));
+ }
+
+ return pageStore.getNumberOfPages();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ private PagingStoreImpl lookupPageStore(SimpleString address) throws Exception
+ {
+ return (PagingStoreImpl)server.getPagingManager().getPageStore(address);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ System.out.println("Tmp:" + getTemporaryDir());
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ server = createServer(true,
+ config,
+ PAGE_SIZE,
+ PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ createQueue(ADDRESS.toString(), ADDRESS.toString());
+ }
+
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}