Author: clebert.suconic(a)jboss.com
Date: 2010-11-08 16:21:08 -0500 (Mon, 08 Nov 2010)
New Revision: 9854
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/ServerMessage.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
Log:
changes
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java 2010-11-08
18:03:29 UTC (rev 9853)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java 2010-11-08
21:21:08 UTC (rev 9854)
@@ -30,6 +30,9 @@
{
ServerMessage getMessage();
+ /** The queues that were routed during paging */
+ long[] getQueueIDs();
+
void initMessage(StorageManager storageManager);
long getTransactionID();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-11-08
18:03:29 UTC (rev 9853)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-11-08
21:21:08 UTC (rev 9854)
@@ -13,11 +13,10 @@
package org.hornetq.core.paging;
-import java.util.List;
-
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -59,10 +58,8 @@
void sync() throws Exception;
- boolean page(List<ServerMessage> messages, long transactionId) throws
Exception;
+ boolean page(ServerMessage message, RoutingContext ctx) throws Exception;
- boolean page(ServerMessage message) throws Exception;
-
Page createPage(final int page) throws Exception;
PagingManager getPagingManager();
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-08
18:03:29 UTC (rev 9853)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-08
21:21:08 UTC (rev 9854)
@@ -32,6 +32,7 @@
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Future;
import org.hornetq.utils.SoftValueHashMap;
@@ -135,6 +136,12 @@
else if (retPos != null)
{
cursorPos = retPos.getPosition();
+
+ if (!routed(retPos.getPagedMessage(), cursor))
+ {
+ cursor.positionIgnored(cursorPos);
+ }
+ else
if (retPos.getPagedMessage().getTransactionID() != 0)
{
PageTransactionInfo tx =
pagingManager.getTransaction(retPos.getPagedMessage().getTransactionID());
@@ -160,6 +167,20 @@
}
}
}
+
+ private boolean routed(PagedMessage message, PageSubscription subs)
+ {
+ long id = subs.getId();
+
+ for (long qid : message.getQueueIDs())
+ {
+ if (qid == id)
+ {
+ return true;
+ }
+ }
+ return false;
+ }
private PagedReferenceImpl internalGetNext(final PagePosition pos)
{
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2010-11-08
18:03:29 UTC (rev 9853)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2010-11-08
21:21:08 UTC (rev 9854)
@@ -49,17 +49,20 @@
private byte[] largeMessageLazyData;
private ServerMessage message;
+
+ private long queueIDs[];
private long transactionID = 0;
- public PagedMessageImpl(final ServerMessage message, final long transactionID)
+ public PagedMessageImpl(final ServerMessage message, final long[] queueIDs, final long
transactionID)
{
- this.message = message;
+ this(message, queueIDs);
this.transactionID = transactionID;
}
- public PagedMessageImpl(final ServerMessage message)
+ public PagedMessageImpl(final ServerMessage message, final long[] queueIDs)
{
+ this.queueIDs = queueIDs;
this.message = message;
}
@@ -87,6 +90,11 @@
{
return transactionID;
}
+
+ public long[] getQueueIDs()
+ {
+ return queueIDs;
+ }
// EncodingSupport implementation --------------------------------
@@ -112,6 +120,15 @@
message.decode(buffer);
}
+
+ int queueIDsSize = buffer.readInt();
+
+ queueIDs = new long[queueIDsSize];
+
+ for (int i = 0 ; i < queueIDsSize; i++)
+ {
+ queueIDs[i] = buffer.readLong();
+ }
}
public void encode(final HornetQBuffer buffer)
@@ -123,11 +140,19 @@
buffer.writeInt(message.getEncodeSize());
message.encode(buffer);
+
+ buffer.writeInt(queueIDs.length);
+
+ for (int i = 0 ; i < queueIDs.length; i++)
+ {
+ buffer.writeLong(queueIDs[i]);
+ }
}
public int getEncodeSize()
{
- return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT +
message.getEncodeSize();
+ return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT +
message.getEncodeSize() +
+ DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
}
// Package protected ---------------------------------------------
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-08
18:03:29 UTC (rev 9853)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-08
21:21:08 UTC (rev 9854)
@@ -25,7 +25,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.api.core.SimpleString;
@@ -46,6 +45,7 @@
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -304,19 +304,13 @@
return storeName;
}
- public boolean page(final List<ServerMessage> message, final long transactionID)
throws Exception
+ public boolean page(final ServerMessage message, final RoutingContext ctx) throws
Exception
{
// The sync on transactions is done on commit only
- return page(message, transactionID, false);
+ // TODO: sync on paging
+ return page(message, ctx, false);
}
- public boolean page(final ServerMessage message) throws Exception
- {
- // If non Durable, there is no need to sync as there is no requirement for
persistence for those messages in case
- // of crash
- return page(Arrays.asList(message), -1, syncNonTransactional &&
message.isDurable());
- }
-
public void sync() throws Exception
{
lock.readLock().lock();
@@ -881,7 +875,7 @@
}
- protected boolean page(final List<ServerMessage> messages, final long
transactionID, final boolean sync) throws Exception
+ protected boolean page(ServerMessage message, final RoutingContext ctx, final boolean
sync) throws Exception
{
if (!running)
{
@@ -939,37 +933,27 @@
return false;
}
- for (ServerMessage message : messages)
+ PagedMessage pagedMessage;
+
+ if (!message.isDurable())
{
- PagedMessage pagedMessage;
+ // The address should never be transient when paging (even for non-persistent
messages when paging)
+ // This will force everything to be persisted
+ message.bodyChanged();
+ }
- if (!message.isDurable())
- {
- // The address should never be transient when paging (even for
non-persistent messages when paging)
- // This will force everything to be persisted
- message.bodyChanged();
- }
+ pagedMessage = new PagedMessageImpl(message, getQueueIDs(ctx),
getTransactionID(ctx));
- if (transactionID != -1)
- {
- pagedMessage = new PagedMessageImpl(message, transactionID);
- }
- else
- {
- pagedMessage = new PagedMessageImpl(message);
- }
+ int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
- int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
+ if (currentPageSize.addAndGet(bytesToWrite) > pageSize &&
currentPage.getNumberOfMessages() > 0)
+ {
+ // Make sure nothing is currently validating or using currentPage
+ openNewPage();
+ }
- if (currentPageSize.addAndGet(bytesToWrite) > pageSize &&
currentPage.getNumberOfMessages() > 0)
- {
- // Make sure nothing is currently validating or using currentPage
- openNewPage();
- }
+ currentPage.write(pagedMessage);
- currentPage.write(pagedMessage);
- }
-
return true;
}
finally
@@ -979,6 +963,36 @@
}
+ private long[] getQueueIDs(RoutingContext ctx)
+ {
+ long ids[] = new long [ctx.getDurableQueues().size() +
ctx.getNonDurableQueues().size()];
+ int i = 0;
+
+ for (org.hornetq.core.server.Queue q : ctx.getDurableQueues())
+ {
+ ids[i++] = q.getID();
+ }
+
+ for (org.hornetq.core.server.Queue q : ctx.getNonDurableQueues())
+ {
+ ids[i++] = q.getID();
+ }
+ return ids;
+ }
+
+ private long getTransactionID(RoutingContext ctx)
+ {
+ Transaction tx = ctx.getTransaction();
+ if (tx == null)
+ {
+ return 0l;
+ }
+ else
+ {
+ return tx.getID();
+ }
+ }
+
/**
* This method will remove files from the page system and and route them, doing it
transactionally
*
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2010-11-08
18:03:29 UTC (rev 9853)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2010-11-08
21:21:08 UTC (rev 9854)
@@ -227,16 +227,6 @@
{
return pageStore;
}
-
- public void paged(final ServerMessage message)
- {
-
- }
-
- public boolean page(final ServerMessage message) throws Exception
- {
- return pageStore.page(message);
- }
public void route(final ServerMessage message, final RoutingContext context) throws
Exception
{
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/ServerMessage.java 2010-11-08
18:03:29 UTC (rev 9853)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/ServerMessage.java 2010-11-08
21:21:08 UTC (rev 9854)
@@ -56,10 +56,6 @@
PagingStore getPagingStore();
- boolean page() throws Exception;
-
- boolean page(long transactionID) throws Exception;
-
boolean storeIsPaging();
void encodeMessageIDToBuffer();
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-11-08
18:03:29 UTC (rev 9853)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-11-08
21:21:08 UTC (rev 9854)
@@ -249,30 +249,6 @@
return pagingStore;
}
- public boolean page() throws Exception
- {
- if (pagingStore != null)
- {
- return pagingStore.page(this);
- }
- else
- {
- return false;
- }
- }
-
- public boolean page(final long transactionID) throws Exception
- {
- if (pagingStore != null)
- {
- return pagingStore.page(Arrays.asList((ServerMessage)this), transactionID);
- }
- else
- {
- return false;
- }
- }
-
public boolean storeIsPaging()
{
if (pagingStore != null)
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-08
18:03:29 UTC (rev 9853)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-08
21:21:08 UTC (rev 9854)
@@ -1014,9 +1014,9 @@
syncNonTransactional);
}
- protected boolean page(final List<ServerMessage> messages, final long
transactionID, final boolean sync) throws Exception
+ protected boolean page(ServerMessage message,
org.hornetq.core.server.RoutingContext ctx, boolean sync) throws Exception
{
- boolean paged = super.page(messages, transactionID, sync);
+ boolean paged = super.page(message, ctx, sync);
if (paged)
{
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-08
18:03:29 UTC (rev 9853)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-08
21:21:08 UTC (rev 9854)
@@ -42,11 +42,14 @@
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.tests.unit.core.postoffice.impl.FakeQueue;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.LinkedListIterator;
@@ -70,6 +73,8 @@
private HornetQServer server;
private Queue queue;
+
+ private List<Queue> queueList;
private static final int PAGE_MAX = -1;
@@ -155,10 +160,6 @@
final int NUM_MESSAGES = 100;
- int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
- System.out.println("NumberOfPages = " + numberOfPages);
-
PageSubscription cursorEven = createNonPersistentCursor(new Filter()
{
@@ -205,6 +206,10 @@
});
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
queue.getPageSubscription().close();
PagedReference msg;
@@ -493,6 +498,8 @@
.getSubscription(queue.getID());
System.out.println("Cursor: " + cursor);
+
+ RoutingContextImpl ctx = generateCTX();
LinkedListIterator<PagedReference> iterator = cursor.iterator();
@@ -508,7 +515,7 @@
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- Assert.assertTrue(pageStore.page(msg));
+ Assert.assertTrue(pageStore.page(msg, ctx));
PagedReference readMessage = iterator.next();
@@ -545,7 +552,7 @@
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- Assert.assertTrue(pageStore.page(msg));
+ Assert.assertTrue(pageStore.page(msg, ctx));
}
PagedReference readMessage = iterator.next();
@@ -581,7 +588,7 @@
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- Assert.assertTrue(pageStore.page(msg));
+ Assert.assertTrue(pageStore.page(msg, ctx));
}
PagedReference readMessage = iterator.next();
@@ -615,6 +622,24 @@
assertFalse(lookupPageStore(ADDRESS).isPaging());
}
+
+ private RoutingContextImpl generateCTX()
+ {
+ return generateCTX(null);
+ }
+
+ private RoutingContextImpl generateCTX(Transaction tx)
+ {
+ RoutingContextImpl ctx = new RoutingContextImpl(tx);
+ ctx.addDurableQueue(queue);
+
+ for (Queue q : this.queueList)
+ {
+ ctx.addQueue(q);
+ }
+
+ return ctx;
+ }
/**
* @throws Exception
@@ -783,15 +808,19 @@
final int NUM_MESSAGES = 100;
- int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
- System.out.println("NumberOfPages = " + numberOfPages);
-
PageCursorProvider cursorProvider = lookupCursorProvider();
PageSubscription cursor = cursorProvider.createSubscription(11, null, false);
PageSubscriptionImpl cursor2 =
(PageSubscriptionImpl)cursorProvider.createSubscription(12, null, false);
+
+ this.queueList.add(new FakeQueue(new SimpleString("a"), 11));
+
+ this.queueList.add(new FakeQueue(new SimpleString("b"), 12));
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
queue.getPageSubscription().close();
PagedReference msg;
@@ -856,16 +885,18 @@
final int NUM_MESSAGES = 100;
+ PageCursorProvider cursorProvider = lookupCursorProvider();
+
+ PageSubscription cursor = cursorProvider.createSubscription(2, null, false);
+
+ queueList.add(new FakeQueue(new SimpleString("tmp"), 2));
+
int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
System.out.println("NumberOfPages = " + numberOfPages);
- PageCursorProvider cursorProvider = lookupCursorProvider();
-
PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
- PageSubscription cursor = cursorProvider.createSubscription(2, null, false);
-
queue.getPageSubscription().close();
PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() /
2);
@@ -1044,6 +1075,8 @@
PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
pageStore.startPaging();
+
+ RoutingContext ctx = generateCTX();
for (int i = start; i < start + numMessages; i++)
{
@@ -1058,7 +1091,7 @@
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- Assert.assertTrue(pageStore.page(msg));
+ Assert.assertTrue(pageStore.page(msg, ctx));
}
return pageStore.getNumberOfPages();
@@ -1077,12 +1110,23 @@
// Protected -----------------------------------------------------
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+ server = null;
+ queue = null;
+ queueList = null;
+ super.tearDown();
+ }
+
protected void setUp() throws Exception
{
super.setUp();
OperationContextImpl.clearContext();
System.out.println("Tmp:" + getTemporaryDir());
+ queueList = new ArrayList<Queue>();
+
createServer();
}
@@ -1117,7 +1161,9 @@
*/
private PageSubscription createNonPersistentCursor(Filter filter) throws Exception
{
- return
lookupCursorProvider().createSubscription(server.getStorageManager().generateUniqueID(),
filter, false);
+ long id = server.getStorageManager().generateUniqueID();
+ queueList.add(new FakeQueue(new SimpleString(filter.toString()), id));
+ return lookupCursorProvider().createSubscription(id, filter, false);
}
/**
@@ -1145,7 +1191,10 @@
final int NUM_MESSAGES,
final int messageSize) throws Exception
{
- List<ServerMessage> messages = new ArrayList<ServerMessage>();
+
+ TransactionImpl txImpl = new TransactionImpl(pgParameter.getTransactionID(), null,
null);
+
+ RoutingContext ctx = generateCTX(txImpl);
for (int i = start; i < start + NUM_MESSAGES; i++)
{
@@ -1153,20 +1202,11 @@
ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(),
buffer.writerIndex());
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
msg.putIntProperty("key", i);
- messages.add(msg);
+ pageStore.page(msg, ctx);
}
- pageStore.page(messages, pgParameter.getTransactionID());
}
- protected void tearDown() throws Exception
- {
- server.stop();
- server = null;
- queue = null;
- super.tearDown();
- }
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2010-11-08
18:03:29 UTC (rev 9853)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2010-11-08
21:21:08 UTC (rev 9854)
@@ -288,7 +288,7 @@
replicatedJournal.appendAddRecordTransactional(23, 24, (byte)1, new
FakeData());
- PagedMessage pgmsg = new PagedMessageImpl(msg, -1);
+ PagedMessage pgmsg = new PagedMessageImpl(msg, new long[0]);
manager.pageWrite(pgmsg, 1);
manager.pageWrite(pgmsg, 2);
manager.pageWrite(pgmsg, 3);
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2010-11-08
18:03:29 UTC (rev 9853)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2010-11-08
21:21:08 UTC (rev 9854)
@@ -223,7 +223,7 @@
msg.setAddress(simpleDestination);
- page.write(new PagedMessageImpl(msg));
+ page.write(new PagedMessageImpl(msg, new long [0]));
Assert.assertEquals(initialNumberOfMessages + i + 1,
page.getNumberOfMessages());
}
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2010-11-08
18:03:29 UTC (rev 9853)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2010-11-08
21:21:08 UTC (rev 9854)
@@ -28,6 +28,7 @@
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -81,11 +82,11 @@
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"),
createRandomBuffer(10));
- Assert.assertFalse(store.page(msg));
+ Assert.assertFalse(store.page(msg, new RoutingContextImpl(null)));
store.startPaging();
- Assert.assertTrue(store.page(msg));
+ Assert.assertTrue(store.page(msg, new RoutingContextImpl(null)));
Page page = store.depage();
@@ -107,7 +108,7 @@
Assert.assertNull(store.depage());
- Assert.assertFalse(store.page(msg));
+ Assert.assertFalse(store.page(msg, new RoutingContextImpl(null)));
}
// Package protected ---------------------------------------------
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-11-08
18:03:29 UTC (rev 9853)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-11-08
21:21:08 UTC (rev 9854)
@@ -64,6 +64,7 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.group.impl.GroupBinding;
+import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -202,7 +203,7 @@
Assert.assertTrue(storeImpl.isPaging());
- Assert.assertTrue(storeImpl.page(msg));
+ Assert.assertTrue(storeImpl.page(msg, new RoutingContextImpl(null)));
Assert.assertEquals(1, storeImpl.getNumberOfPages());
@@ -265,7 +266,7 @@
ServerMessage msg = createMessage(i, storeImpl, destination, buffer);
- Assert.assertTrue(storeImpl.page(msg));
+ Assert.assertTrue(storeImpl.page(msg, new RoutingContextImpl(null)));
}
Assert.assertEquals(1, storeImpl.getNumberOfPages());
@@ -345,7 +346,7 @@
ServerMessage msg = createMessage(i, storeImpl, destination, buffer);
- Assert.assertTrue(storeImpl.page(msg));
+ Assert.assertTrue(storeImpl.page(msg, new RoutingContextImpl(null)));
}
Assert.assertEquals(2, storeImpl.getNumberOfPages());
@@ -381,7 +382,7 @@
ServerMessage msg = createMessage(1, storeImpl, destination, buffers.get(0));
- Assert.assertTrue(storeImpl.page(msg));
+ Assert.assertTrue(storeImpl.page(msg, new RoutingContextImpl(null)));
Page newPage = storeImpl.depage();
@@ -399,11 +400,11 @@
Assert.assertFalse(storeImpl.isPaging());
- Assert.assertFalse(storeImpl.page(msg));
+ Assert.assertFalse(storeImpl.page(msg, new RoutingContextImpl(null)));
storeImpl.startPaging();
- Assert.assertTrue(storeImpl.page(msg));
+ Assert.assertTrue(storeImpl.page(msg, new RoutingContextImpl(null)));
Page page = storeImpl.depage();
@@ -499,7 +500,7 @@
// This is possible because the depage thread is not actually reading
the pages.
// Just using the internal API to remove it from the page file system
ServerMessage msg = createMessage(id, storeImpl, destination,
createRandomBuffer(id, 5));
- if (storeImpl.page(msg))
+ if (storeImpl.page(msg, new RoutingContextImpl(null)))
{
buffers.put(id, msg);
}
@@ -644,7 +645,7 @@
long lastMessageId = messageIdGenerator.incrementAndGet();
ServerMessage lastMsg = createMessage(lastMessageId, storeImpl, destination,
createRandomBuffer(lastMessageId, 5));
- storeImpl2.page(lastMsg);
+ storeImpl2.page(lastMsg, new RoutingContextImpl(null));
buffers2.put(lastMessageId, lastMsg);
Page lastPage = null;
@@ -752,7 +753,7 @@
// Just using the internal API to remove it from the page file system
ServerMessage msg = createMessage(i, storeImpl, destination,
createRandomBuffer(i, 1024));
msg.putLongProperty("count", i);
- while (!storeImpl.page(msg))
+ while (!storeImpl.page(msg, new RoutingContextImpl(null)))
{
storeImpl.startPaging();
}
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-11-08
18:03:29 UTC (rev 9853)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-11-08
21:21:08 UTC (rev 9854)
@@ -92,10 +92,18 @@
}
private final SimpleString name;
+
+ private final long id;
public FakeQueue(final SimpleString name)
{
+ this(name, 0);
+ }
+
+ public FakeQueue(final SimpleString name, final long id)
+ {
this.name = name;
+ this.id = id;
}
/* (non-Javadoc)
@@ -354,8 +362,7 @@
*/
public long getID()
{
- // TODO Auto-generated method stub
- return 0;
+ return id;
}
/* (non-Javadoc)