Author: clebert.suconic(a)jboss.com
Date: 2010-10-22 15:40:19 -0400 (Fri, 22 Oct 2010)
New Revision: 9809
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Adding filter on cursor
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-22
01:08:55 UTC (rev 9808)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-22
19:40:19 UTC (rev 9809)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.cursor;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.filter.Filter;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
@@ -50,11 +51,13 @@
*/
PageCursor getPersistentCursor(long queueId);
+ PageCursor createPersistentCursor(long queueId, Filter filter);
+
/**
* Create a non persistent cursor, usually associated with browsing
* @return
*/
- PageCursor createNonPersistentCursor();
+ PageCursor createNonPersistentCursor(Filter filter);
Pair<PagePosition, PagedMessage> getNext(PageCursor cursor, PagePosition pos)
throws Exception;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-22
01:08:55 UTC (rev 9808)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-22
19:40:19 UTC (rev 9809)
@@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
@@ -70,6 +71,8 @@
private final StorageManager store;
private final long cursorId;
+
+ private final Filter filter;
private final PagingStore pageStore;
@@ -96,6 +99,7 @@
final PagingStore pageStore,
final StorageManager store,
final Executor executor,
+ final Filter filter,
final long cursorId)
{
this.pageStore = pageStore;
@@ -103,6 +107,7 @@
this.cursorProvider = cursorProvider;
this.cursorId = cursorId;
this.executor = executor;
+ this.filter = filter;
}
// Public --------------------------------------------------------
@@ -472,8 +477,14 @@
protected boolean match(final ServerMessage message)
{
- // To be used with expressions
- return true;
+ if (filter == null)
+ {
+ return true;
+ }
+ else
+ {
+ return filter.match(message);
+ }
}
// Private -------------------------------------------------------
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-22
01:08:55 UTC (rev 9808)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-22
19:40:19 UTC (rev 9809)
@@ -19,6 +19,7 @@
import java.util.concurrent.Executor;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PageTransactionInfo;
@@ -92,32 +93,34 @@
{
return pagingStore;
}
+
+ public PageCursor createPersistentCursor(long cursorID, Filter filter)
+ {
+ PageCursor activeCursor = activeCursors.get(cursorID);
+ if (activeCursor != null)
+ {
+ throw new IllegalStateException("Cursor " + cursorID + " had
already been created");
+ }
+
+ activeCursor = new PageCursorImpl(this, pagingStore, storageManager,
executorFactory.getExecutor(), filter, cursorID);
+ activeCursors.put(cursorID, activeCursor);
+ return activeCursor;
+ }
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
*/
public PageCursor getPersistentCursor(long cursorID)
{
- PageCursor activeCursor = activeCursors.get(cursorID);
- if (activeCursor == null)
- {
- activeCursor = new PageCursorImpl(this, pagingStore, storageManager,
executorFactory.getExecutor(), cursorID);
- PageCursor previousValue = activeCursors.putIfAbsent(cursorID, activeCursor);
- if (previousValue != null)
- {
- activeCursor = previousValue;
- }
- }
-
- return activeCursor;
+ return activeCursors.get(cursorID);
}
/**
* this will create a non-persistent cursor
*/
- public PageCursor createNonPersistentCursor()
+ public PageCursor createNonPersistentCursor(Filter filter)
{
- PageCursor cursor = new PageCursorImpl(this, pagingStore, storageManager,
executorFactory.getExecutor(), 0);
+ PageCursor cursor = new PageCursorImpl(this, pagingStore, storageManager,
executorFactory.getExecutor(), filter, 0);
nonPersistentCursors.add(cursor);
return cursor;
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-22
01:08:55 UTC (rev 9808)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-22
19:40:19 UTC (rev 9809)
@@ -21,8 +21,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+ import java.util.Map.Entry;
import java.util.Set;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -61,7 +61,6 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.GroupingInfo;
@@ -1214,6 +1213,8 @@
managementService.registerAddress(queueBindingInfo.getAddress());
managementService.registerQueue(queue, queueBindingInfo.getAddress(),
storageManager);
+
+
pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createPersistentCursor(queue.getID(),
filter);
}
for (GroupingInfo groupingInfo : groupingInfos)
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-22
01:08:55 UTC (rev 9808)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-22
19:40:19 UTC (rev 9809)
@@ -23,6 +23,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.filter.Filter;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.PageCache;
@@ -63,7 +64,7 @@
private SimpleString ADDRESS = new SimpleString("test-add");
private HornetQServer server;
-
+
private Queue queue;
private static final int PAGE_MAX = -1;
@@ -86,7 +87,9 @@
System.out.println("NumberOfPages = " + numberOfPages);
- PageCursorProviderImpl cursorProvider = new
PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(),
server.getExecutorFactory());
+ PageCursorProviderImpl cursorProvider = new
PageCursorProviderImpl(lookupPageStore(ADDRESS),
+
server.getStorageManager(),
+
server.getExecutorFactory());
for (int i = 0; i < numberOfPages; i++)
{
@@ -94,15 +97,14 @@
System.out.println("Page " + i + " had " +
cache.getNumberOfMessages() + " messages");
}
-
+
forceGC();
-
+
assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-
+
System.out.println("Cache size = " + cursorProvider.getCacheSize());
}
-
public void testSimpleCursor() throws Exception
{
@@ -111,11 +113,11 @@
int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
System.out.println("NumberOfPages = " + numberOfPages);
-
+
PageCursor cursor = createNonPersistentCursor();
-
+
Pair<PagePosition, PagedMessage> msg;
-
+
int key = 0;
while ((msg = cursor.moveNext()) != null)
{
@@ -123,20 +125,104 @@
cursor.ack(msg.a);
}
assertEquals(NUM_MESSAGES, key);
-
-
+
forceGC();
-
+
assertTrue(lookupCursorProvider().getCacheSize() < numberOfPages);
-
+
server.stop();
createServer();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
+ public void testSimpleCursorWithFilter() throws Exception
+ {
+ final int NUM_MESSAGES = 100;
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PageCursor cursorEven = createNonPersistentCursor(new Filter()
+ {
+
+ public boolean match(ServerMessage message)
+ {
+ Boolean property = message.getBooleanProperty("even");
+ if (property == null)
+ {
+ return false;
+ }
+ else
+ {
+ return property.booleanValue();
+ }
+ }
+
+ public SimpleString getFilterString()
+ {
+ return new SimpleString("even=true");
+ }
+
+ });
+
+ PageCursor cursorOdd = createNonPersistentCursor(new Filter()
+ {
+
+ public boolean match(ServerMessage message)
+ {
+ Boolean property = message.getBooleanProperty("even");
+ if (property == null)
+ {
+ return false;
+ }
+ else
+ {
+ return !property.booleanValue();
+ }
+ }
+
+ public SimpleString getFilterString()
+ {
+ return new SimpleString("even=true");
+ }
+
+ });
+
+ Pair<PagePosition, PagedMessage> msg;
+
+ int key = 0;
+ while ((msg = cursorEven.moveNext()) != null)
+ {
+ assertEquals(key,
msg.b.getMessage().getIntProperty("key").intValue());
+
assertTrue(msg.b.getMessage().getBooleanProperty("even").booleanValue());
+ key += 2;
+ cursorEven.ack(msg.a);
+ }
+ assertEquals(NUM_MESSAGES, key);
+
+ key = 1;
+ while ((msg = cursorOdd.moveNext()) != null)
+ {
+ assertEquals(key,
msg.b.getMessage().getIntProperty("key").intValue());
+
assertFalse(msg.b.getMessage().getBooleanProperty("even").booleanValue());
+ key += 2;
+ cursorOdd.ack(msg.a);
+ }
+ assertEquals(NUM_MESSAGES + 1, key);
+
+ forceGC();
+
+ assertTrue(lookupCursorProvider().getCacheSize() < numberOfPages);
+
+ server.stop();
+ createServer();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+ }
+
public void testReadNextPage() throws Exception
{
@@ -147,196 +233,226 @@
System.out.println("NumberOfPages = " + numberOfPages);
PageCursorProvider cursorProvider = lookupCursorProvider();
-
- PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(2,0));
-
+
+ PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(2, 0));
+
assertNull(cache);
}
-
-
public void testRestart() throws Exception
{
final int NUM_MESSAGES = 1000;
int numberOfPages = addMessages(NUM_MESSAGES, 100 * 1024);
-
+
System.out.println("Number of pages = " + numberOfPages);
-
+
PageCursorProvider cursorProvider = lookupCursorProvider();
-
-
- PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-
- PageCache firstPage = cursorProvider.getPageCache(new
PagePositionImpl(server.getPagingManager().getPageStore(ADDRESS).getFirstPage(), 0));
+ // TODO: We should be using getPersisentCursor here but I can't change the
method here until createQueue is not
+ // creating the cursor also
+ // need to change this after some integration
+ // PageCursor cursor =
+ //
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+ PageCursor cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .createPersistentCursor(queue.getID(), null);
+
+ PageCache firstPage = cursorProvider.getPageCache(new
PagePositionImpl(server.getPagingManager()
+
.getPageStore(ADDRESS)
+
.getFirstPage(), 0));
+
int firstPageSize = firstPage.getNumberOfMessages();
-
+
firstPage = null;
-
+
System.out.println("Cursor: " + cursor);
cursorProvider.printDebug();
- for (int i = 0 ; i < 1000 ; i++)
+ for (int i = 0; i < 1000; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertNotNull(msg);
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
-
+
if (i < firstPageSize)
{
cursor.ack(msg.a);
}
}
cursorProvider.printDebug();
-
+
// needs to clear the context since we are using the same thread over two distinct
servers
// otherwise we will get the old executor on the factory
OperationContextImpl.clearContext();
-
+
server.stop();
-
+
server.start();
-
- cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-
+
+ cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getPersistentCursor(queue.getID());
+
for (int i = firstPageSize; i < NUM_MESSAGES; i++)
{
System.out.println("Received " + i);
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertNotNull(msg);
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
-
+
cursor.ack(msg.a);
-
+
OperationContextImpl.getContext(null).waitCompletion();
-
+
}
- OperationContextImpl.getContext(null).waitCompletion();
+ OperationContextImpl.getContext(null).waitCompletion();
((PageCursorImpl)cursor).printDebug();
-
+
server.stop();
createServer();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
-
+
public void testRestartWithHoleOnAck() throws Exception
{
final int NUM_MESSAGES = 1000;
int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
-
+
System.out.println("Number of pages = " + numberOfPages);
-
+
PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
-
- PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-
+
+ // TODO: We should be using getPersisentCursor here but I can't change the
method here until createQueue is not
+ // creating the cursor also
+ // need to change this after some integration
+ // PageCursor cursor =
+ //
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+ PageCursor cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .createPersistentCursor(queue.getID(), null);
+
System.out.println("Cursor: " + cursor);
- for (int i = 0 ; i < 100 ; i++)
+ for (int i = 0; i < 100; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
if (i < 10 || i > 20)
{
cursor.ack(msg.a);
}
}
-
+
server.stop();
-
+
OperationContextImpl.clearContext();
-
+
server.start();
-
- cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-
+
+ cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getPersistentCursor(queue.getID());
+
for (int i = 10; i <= 20; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
}
-
-
+
for (int i = 100; i < NUM_MESSAGES; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
}
-
+
server.stop();
createServer();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
-
-
+
public void testRestartWithHoleOnAckAndTransaction() throws Exception
{
final int NUM_MESSAGES = 1000;
int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
-
+
System.out.println("Number of pages = " + numberOfPages);
-
+
PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
-
- PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-
+
+ // TODO: We should be using getPersisentCursor here but I can't change the
method here until createQueue is not
+ // creating the cursor also
+ // need to change this after some integration
+ // PageCursor cursor =
+ //
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+ PageCursor cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .createPersistentCursor(queue.getID(), null);
+
System.out.println("Cursor: " + cursor);
-
+
Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
- for (int i = 0 ; i < 100 ; i++)
+ for (int i = 0; i < 100; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
if (i < 10 || i > 20)
{
cursor.ackTx(tx, msg.a);
}
}
-
+
tx.commit();
-
+
server.stop();
-
+
OperationContextImpl.clearContext();
-
+
server.start();
-
- cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+ cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getPersistentCursor(queue.getID());
+
tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
-
+
for (int i = 10; i <= 20; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
- cursor.ackTx(tx,msg.a);
+ cursor.ackTx(tx, msg.a);
}
-
+
for (int i = 100; i < NUM_MESSAGES; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
- cursor.ackTx(tx,msg.a);
+ cursor.ackTx(tx, msg.a);
}
-
+
tx.commit();
-
+
server.stop();
createServer();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
+
}
-
+
public void testConsumeLivePage() throws Exception
{
PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
@@ -344,117 +460,131 @@
pageStore.startPaging();
final int NUM_MESSAGES = 100;
-
+
final int messageSize = 1024 * 1024;
-
-
+
PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
-
- PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-
+
+ // TODO: We should be using getPersisentCursor here but I can't change the
method here until createQueue is not
+ // creating the cursor also
+ // need to change this after some integration
+ // PageCursor cursor =
+ //
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+ PageCursor cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .createPersistentCursor(queue.getID(), null);
+
System.out.println("Cursor: " + cursor);
-
for (int i = 0; i < NUM_MESSAGES; i++)
{
- if (i % 100 == 0) System.out.println("Paged " + i);
-
+ if (i % 100 == 0)
+ System.out.println("Paged " + i);
+
HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
msg.putIntProperty("key", i);
-
+
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
Assert.assertTrue(pageStore.page(msg));
-
+
Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
-
+
assertNotNull(readMessage);
-
+
assertEquals(i,
readMessage.b.getMessage().getIntProperty("key").intValue());
-
+
assertNull(cursor.moveNext());
}
-
+
server.stop();
-
+
OperationContextImpl.clearContext();
-
+
createServer();
-
+
pageStore = lookupPageStore(ADDRESS);
-
- cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-
+
+ cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getPersistentCursor(queue.getID());
+
for (int i = 0; i < NUM_MESSAGES * 2; i++)
{
- if (i % 100 == 0) System.out.println("Paged " + i);
+ if (i % 100 == 0)
+ System.out.println("Paged " + i);
if (i >= NUM_MESSAGES)
{
-
+
HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
-
+
ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
msg.putIntProperty("key", i);
-
+
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-
+
Assert.assertTrue(pageStore.page(msg));
}
-
+
Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
-
+
assertNotNull(readMessage);
-
+
assertEquals(i,
readMessage.b.getMessage().getIntProperty("key").intValue());
}
server.stop();
-
+
OperationContextImpl.clearContext();
-
+
createServer();
-
+
pageStore = lookupPageStore(ADDRESS);
-
- cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-
+
+ cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getPersistentCursor(queue.getID());
+
for (int i = 0; i < NUM_MESSAGES * 3; i++)
{
- if (i % 100 == 0) System.out.println("Paged " + i);
+ if (i % 100 == 0)
+ System.out.println("Paged " + i);
if (i >= NUM_MESSAGES * 2)
{
-
+
HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
-
+
ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
msg.putIntProperty("key", i);
-
+
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-
+
Assert.assertTrue(pageStore.page(msg));
}
-
+
Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
-
+
assertNotNull(readMessage);
-
+
cursor.ack(readMessage.a);
-
+
assertEquals(i,
readMessage.b.getMessage().getIntProperty("key").intValue());
}
server.stop();
createServer();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
+
}
-
-
+
public void testPrepareScenarios() throws Exception
{
PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
@@ -462,40 +592,46 @@
pageStore.startPaging();
final int NUM_MESSAGES = 100;
-
+
final int messageSize = 100 * 1024;
-
-
+
PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
-
- PageCursor cursor =
pageStore.getCursorProvier().getPersistentCursor(queue.getID());
-
+
+ // TODO: We should be using getPersisentCursor here but I can't change the
method here until createQueue is not
+ // creating the cursor also
+ // need to change this after some integration
+ // PageCursor cursor =
+ //
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+ PageCursor cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .createPersistentCursor(queue.getID(), null);
+
System.out.println("Cursor: " + cursor);
-
+
StorageManager storage = this.server.getStorageManager();
-
+
PageTransactionInfoImpl pgtxRollback = new
PageTransactionInfoImpl(storage.generateUniqueID());
PageTransactionInfoImpl pgtxForgotten = new
PageTransactionInfoImpl(storage.generateUniqueID());
PageTransactionInfoImpl pgtxCommit = new
PageTransactionInfoImpl(storage.generateUniqueID());
-
+
System.out.println("Forgetting tx " + pgtxForgotten.getTransactionID());
-
+
this.server.getPagingManager().addTransaction(pgtxRollback);
this.server.getPagingManager().addTransaction(pgtxCommit);
-
+
pgMessages(storage, pageStore, pgtxRollback, 0, NUM_MESSAGES, messageSize);
pageStore.forceAnotherPage();
pgMessages(storage, pageStore, pgtxForgotten, 100, NUM_MESSAGES, messageSize);
pageStore.forceAnotherPage();
pgMessages(storage, pageStore, pgtxCommit, 200, NUM_MESSAGES, messageSize);
pageStore.forceAnotherPage();
-
+
addMessages(300, NUM_MESSAGES, messageSize);
-
+
System.out.println("Number of pages - " + pageStore.getNumberOfPages());
-
// First consume what's already there without any tx as nothing was committed
for (int i = 300; i < 400; i++)
{
@@ -506,10 +642,10 @@
}
assertNull(cursor.moveNext());
-
+
cursor.printDebug();
pgtxRollback.rollback();
-
+
this.server.getPagingManager().removeTransaction(pgtxRollback.getTransactionID());
pgtxCommit.commit();
@@ -521,16 +657,15 @@
assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
cursor.ack(pos.a);
}
-
+
assertNull(cursor.moveNext());
-
+
server.stop();
createServer();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
-
+
}
-
+
public void testCloseNonPersistentConsumer() throws Exception
{
@@ -541,12 +676,12 @@
System.out.println("NumberOfPages = " + numberOfPages);
PageCursorProvider cursorProvider = lookupCursorProvider();
-
- PageCursor cursor = cursorProvider.createNonPersistentCursor();
- PageCursorImpl cursor2 =
(PageCursorImpl)cursorProvider.createNonPersistentCursor();
-
+
+ PageCursor cursor = cursorProvider.createNonPersistentCursor(null);
+ PageCursorImpl cursor2 =
(PageCursorImpl)cursorProvider.createNonPersistentCursor(null);
+
Pair<PagePosition, PagedMessage> msg;
-
+
int key = 0;
while ((msg = cursor.moveNext()) != null)
{
@@ -554,18 +689,17 @@
cursor.ack(msg.a);
}
assertEquals(NUM_MESSAGES, key);
-
-
+
forceGC();
-
+
assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-
- for (int i = 0 ; i < 10; i++)
+
+ for (int i = 0; i < 10; i++)
{
msg = cursor2.moveNext();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
}
-
+
assertSame(cursor2.getProvider(), cursorProvider);
cursor2.close();
@@ -575,13 +709,12 @@
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
-
-
+
public void testLeavePageStateAndRestart() throws Exception
{
- // Validate the cursor are working fine when all the pages are gone, and then
paging being restarted
+ // Validate the cursor are working fine when all the pages are gone, and then
paging being restarted
}
-
+
public void testFirstMessageInTheMiddle() throws Exception
{
@@ -592,20 +725,20 @@
System.out.println("NumberOfPages = " + numberOfPages);
PageCursorProvider cursorProvider = lookupCursorProvider();
-
+
PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
- PageCursor cursor = cursorProvider.createNonPersistentCursor();
- PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages()/2);
+ PageCursor cursor = cursorProvider.createNonPersistentCursor(null);
+ PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() /
2);
cursor.bookmark(startingPos);
PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
msg.initMessage(server.getStorageManager());
int key = msg.getMessage().getIntProperty("key").intValue();
-
+
msg = null;
-
+
cache = null;
-
+
Pair<PagePosition, PagedMessage> msgCursor = null;
while ((msgCursor = cursor.moveNext()) != null)
{
@@ -613,18 +746,16 @@
cursor.ack(msgCursor.a);
}
assertEquals(NUM_MESSAGES, key);
-
-
+
forceGC();
-
+
assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-
+
server.stop();
createServer();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
-
-
+
public void testFirstMessageInTheMiddlePersistent() throws Exception
{
@@ -635,35 +766,39 @@
System.out.println("NumberOfPages = " + numberOfPages);
PageCursorProvider cursorProvider = lookupCursorProvider();
-
+
PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
- PageCursor cursor = cursorProvider.getPersistentCursor(queue.getID());
- PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages()/2);
+ // TODO: We should be using getPersisentCursor here but I can't change the
method here until createQueue is not
+ // creating the cursor also
+ // need to change this after some integration
+ // PageCursor cursor =
+ //
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+ PageCursor cursor = cursorProvider.createPersistentCursor(queue.getID(), null);
+ PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() /
2);
cursor.bookmark(startingPos);
PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
msg.initMessage(server.getStorageManager());
int initialKey = msg.getMessage().getIntProperty("key").intValue();
int key = initialKey;
-
+
msg = null;
-
+
cache = null;
-
+
Pair<PagePosition, PagedMessage> msgCursor = null;
while ((msgCursor = cursor.moveNext()) != null)
{
assertEquals(key++,
msgCursor.b.getMessage().getIntProperty("key").intValue());
}
assertEquals(NUM_MESSAGES, key);
-
-
+
server.stop();
-
+
OperationContextImpl.clearContext();
-
+
createServer();
-
+
cursorProvider = lookupCursorProvider();
cursor = cursorProvider.getPersistentCursor(queue.getID());
key = initialKey;
@@ -672,18 +807,17 @@
assertEquals(key++,
msgCursor.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msgCursor.a);
}
-
-
+
forceGC();
-
+
assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-
+
server.stop();
createServer();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
-
+
private int addMessages(final int numMessages, final int messageSize) throws
Exception
{
return addMessages(0, numMessages, messageSize);
@@ -702,17 +836,20 @@
for (int i = start; i < start + numMessages; i++)
{
- if (i % 100 == 0) System.out.println("Paged " + i);
+ if (i % 100 == 0)
+ System.out.println("Paged " + i);
HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
msg.putIntProperty("key", i);
-
+ // to be used on tests that are validating filters
+ msg.putBooleanProperty("even", i % 2 == 0);
+
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
Assert.assertTrue(pageStore.page(msg));
}
-
+
return pageStore.getNumberOfPages();
}
@@ -734,53 +871,56 @@
super.setUp();
OperationContextImpl.clearContext();
System.out.println("Tmp:" + getTemporaryDir());
-
+
createServer();
-
- //createQueue(ADDRESS.toString(), ADDRESS.toString());
}
-
/**
* @throws Exception
*/
private void createServer() throws Exception
{
OperationContextImpl.clearContext();
-
+
Configuration config = createDefaultConfig();
-
+
config.setJournalSyncNonTransactional(true);
- server = createServer(true,
- config,
- PAGE_SIZE,
- PAGE_MAX,
- new HashMap<String, AddressSettings>());
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String,
AddressSettings>());
server.start();
-
+
try
{
queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
}
catch (Exception ignored)
{
- }
+ }
}
+
/**
* @return
* @throws Exception
*/
private PageCursor createNonPersistentCursor() throws Exception
{
- return lookupCursorProvider().createNonPersistentCursor();
+ return lookupCursorProvider().createNonPersistentCursor(null);
}
/**
* @return
* @throws Exception
*/
+ private PageCursor createNonPersistentCursor(Filter filter) throws Exception
+ {
+ return lookupCursorProvider().createNonPersistentCursor(filter);
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
private PageCursorProvider lookupCursorProvider() throws Exception
{
return lookupPageStore(ADDRESS).getCursorProvier();
@@ -803,8 +943,8 @@
final int messageSize) throws Exception
{
List<ServerMessage> messages = new ArrayList<ServerMessage>();
-
- for (int i = start ; i < start + NUM_MESSAGES; i++)
+
+ for (int i = start; i < start + NUM_MESSAGES; i++)
{
HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(),
buffer.writerIndex());
@@ -812,10 +952,9 @@
msg.putIntProperty("key", i);
messages.add(msg);
}
-
+
pageStore.page(messages, pgParameter.getTransactionID());
}
-
protected void tearDown() throws Exception
{