Author: clebert.suconic(a)jboss.com
Date: 2010-10-21 17:02:05 -0400 (Thu, 21 Oct 2010)
New Revision: 9807
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Live cursors update
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-10-21
14:45:43 UTC (rev 9806)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-10-21
21:02:05 UTC (rev 9807)
@@ -105,8 +105,9 @@
{
ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
+ size.set((int)file.size());
// Using direct buffer, as described on
https://jira.jboss.org/browse/HORNETQ-467
- ByteBuffer buffer2 = ByteBuffer.allocateDirect((int)file.size());
+ ByteBuffer buffer2 = ByteBuffer.allocateDirect(size.get());
file.position(0);
file.read(buffer2);
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-21
14:45:43 UTC (rev 9806)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-21
21:02:05 UTC (rev 9807)
@@ -20,9 +20,7 @@
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -472,9 +470,32 @@
firstPageId = fileId;
}
}
-
- if (numberOfPages != 0)
+
+ if (currentPageId != 0)
{
+ currentPage = createPage(currentPageId);
+ currentPage.open();
+
+ List<PagedMessage> messages = currentPage.read();
+
+ LivePageCache pageCache = new LivePageCacheImpl(currentPage);
+
+ for (PagedMessage msg : messages)
+ {
+ msg.initMessage(storageManager);
+ pageCache.addLiveMessage(msg);
+ }
+
+ currentPage.setLiveCache(pageCache);
+
+ currentPageSize.set(currentPage.getSize());
+
+ cursorProvider.addPageCache(pageCache);
+ }
+
+ if (currentPage != null)
+ {
+
startPaging();
}
}
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-21
14:45:43 UTC (rev 9806)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-21
21:02:05 UTC (rev 9807)
@@ -134,16 +134,7 @@
}
- /**
- * @return
- * @throws Exception
- */
- private PageCursor createNonPersistentCursor() throws Exception
- {
- return lookupCursorProvider().createNonPersistentCursor();
- }
-
public void testReadNextPage() throws Exception
{
@@ -160,15 +151,6 @@
assertNull(cache);
}
-
- /**
- * @return
- * @throws Exception
- */
- private PageCursorProvider lookupCursorProvider() throws Exception
- {
- return lookupPageStore(ADDRESS).getCursorProvier();
- }
public void testRestart() throws Exception
@@ -348,7 +330,7 @@
pageStore.startPaging();
- final int NUM_MESSAGES = 1000;
+ final int NUM_MESSAGES = 100;
final int messageSize = 1024 * 1024;
@@ -378,13 +360,82 @@
assertNotNull(readMessage);
- cursor.ack(readMessage.a);
-
assertEquals(i,
readMessage.b.getMessage().getIntProperty("key").intValue());
assertNull(cursor.moveNext());
}
+ server.stop();
+
+ OperationContextImpl.clearContext();
+
+ createServer();
+
+ pageStore = lookupPageStore(ADDRESS);
+
+ cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+
+ for (int i = 0; i < NUM_MESSAGES * 2; i++)
+ {
+ if (i % 100 == 0) System.out.println("Paged " + i);
+
+ if (i >= NUM_MESSAGES)
+ {
+
+ HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+
+ ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+ msg.putIntProperty("key", i);
+
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+ Assert.assertTrue(pageStore.page(msg));
+ }
+
+ Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+
+ assertNotNull(readMessage);
+
+ assertEquals(i,
readMessage.b.getMessage().getIntProperty("key").intValue());
+ }
+
+ server.stop();
+
+ OperationContextImpl.clearContext();
+
+ createServer();
+
+ pageStore = lookupPageStore(ADDRESS);
+
+ cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+
+ for (int i = 0; i < NUM_MESSAGES * 3; i++)
+ {
+ if (i % 100 == 0) System.out.println("Paged " + i);
+
+ if (i >= NUM_MESSAGES * 2)
+ {
+
+ HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+
+ ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+ msg.putIntProperty("key", i);
+
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+ Assert.assertTrue(pageStore.page(msg));
+ }
+
+ Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+
+ assertNotNull(readMessage);
+
+ cursor.ack(readMessage.a);
+
+ assertEquals(i,
readMessage.b.getMessage().getIntProperty("key").intValue());
+ }
+
+
}
@@ -455,36 +506,6 @@
}
- /**
- * @param storage
- * @param pageStore
- * @param pgParameter
- * @param start
- * @param NUM_MESSAGES
- * @param messageSize
- * @throws Exception
- */
- private void pgMessages(StorageManager storage,
- PagingStoreImpl pageStore,
- PageTransactionInfo pgParameter,
- int start,
- final int NUM_MESSAGES,
- final int messageSize) throws Exception
- {
- List<ServerMessage> messages = new ArrayList<ServerMessage>();
-
- for (int i = start ; i < start + NUM_MESSAGES; i++)
- {
- HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
- ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(),
buffer.writerIndex());
- msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- msg.putIntProperty("key", i);
- messages.add(msg);
- }
-
- pageStore.page(messages, pgParameter.getTransactionID());
- }
-
public void testCleanupScenarios() throws Exception
{
// Validate the pages are being cleared (with multiple cursors)
@@ -587,7 +608,7 @@
server.start();
Thread.sleep(1000);
- assertEquals(2, lookupPageStore(ADDRESS).getNumberOfPages());
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
@@ -631,7 +652,7 @@
OperationContextImpl.clearContext();
- server.start();
+ createServer();
cursorProvider = lookupCursorProvider();
cursor = cursorProvider.getPersistentCursor(queue.getID());
@@ -649,11 +670,11 @@
// This is to make sure all the pending files will be deleted
server.stop();
+ OperationContextImpl.clearContext();
+
+ createServer();
- server.start();
-
- // TODO: this should be exact 2
- assertTrue(lookupPageStore(ADDRESS).getNumberOfPages() <= 3);
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -708,6 +729,17 @@
OperationContextImpl.clearContext();
System.out.println("Tmp:" + getTemporaryDir());
+ createServer();
+
+ //createQueue(ADDRESS.toString(), ADDRESS.toString());
+ }
+
+
+ /**
+ * @throws Exception
+ */
+ private void createServer() throws Exception
+ {
Configuration config = createDefaultConfig();
config.setJournalSyncNonTransactional(true);
@@ -720,11 +752,63 @@
server.start();
- queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+ try
+ {
+ queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+ /**
+ * @return
+ * @throws Exception
+ */
+ private PageCursor createNonPersistentCursor() throws Exception
+ {
+ return lookupCursorProvider().createNonPersistentCursor();
+ }
- //createQueue(ADDRESS.toString(), ADDRESS.toString());
+ /**
+ * @return
+ * @throws Exception
+ */
+ private PageCursorProvider lookupCursorProvider() throws Exception
+ {
+ return lookupPageStore(ADDRESS).getCursorProvier();
}
+ /**
+ * @param storage
+ * @param pageStore
+ * @param pgParameter
+ * @param start
+ * @param NUM_MESSAGES
+ * @param messageSize
+ * @throws Exception
+ */
+ private void pgMessages(StorageManager storage,
+ PagingStoreImpl pageStore,
+ PageTransactionInfo pgParameter,
+ int start,
+ final int NUM_MESSAGES,
+ final int messageSize) throws Exception
+ {
+ List<ServerMessage> messages = new ArrayList<ServerMessage>();
+
+ for (int i = start ; i < start + NUM_MESSAGES; i++)
+ {
+ HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+ ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(),
buffer.writerIndex());
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+ msg.putIntProperty("key", i);
+ messages.add(msg);
+ }
+
+ pageStore.page(messages, pgParameter.getTransactionID());
+ }
+
+
protected void tearDown() throws Exception
{
server.stop();