Author: clebert.suconic(a)jboss.com
Date: 2010-11-02 18:44:05 -0400 (Tue, 02 Nov 2010)
New Revision: 9833
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Fixing tests
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-02
03:33:07 UTC (rev 9832)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-02
22:44:05 UTC (rev 9833)
@@ -297,6 +297,7 @@
PageCursorInfo info = getPageInfo(message.a, false);
if (info != null && info.isRemoved(message.a))
{
+ tmpPosition = message.a;
valid = false;
}
}
@@ -907,8 +908,7 @@
public boolean isRemoved(final PagePosition pos)
{
- return false;
- //return removedReferences.contains(pos);
+ return removedReferences.contains(pos);
}
public void remove(final PagePosition position)
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-02
03:33:07 UTC (rev 9832)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-02
22:44:05 UTC (rev 9833)
@@ -619,7 +619,7 @@
boolean depage = tx.getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
- // if the TX paged at least one message on a give address, all the other
addresses should also go towards
+ // if the TX paged at least one message on a give address, all the other message
on the same address should also go towards
// paging cache now
boolean alreadyPaging = false;
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-02
03:33:07 UTC (rev 9832)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-02
22:44:05 UTC (rev 9833)
@@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import junit.framework.Assert;
@@ -129,9 +130,8 @@
cursor.ack(msg.a);
}
assertEquals(NUM_MESSAGES, key);
-
+
server.getStorageManager().waitOnOperations();
-
waitCleanup();
@@ -202,7 +202,7 @@
}
});
-
+
queue.getPageSubscription().close();
Pair<PagePosition, PagedMessage> msg;
@@ -210,7 +210,6 @@
LinkedListIterator<Pair<PagePosition, PagedMessage>> iteratorEven =
cursorEven.iterator();
LinkedListIterator<Pair<PagePosition, PagedMessage>> iteratorOdd =
cursorOdd.iterator();
-
int key = 0;
while ((msg = iteratorEven.next()) != null)
@@ -235,7 +234,7 @@
forceGC();
- // assertTrue(lookupCursorProvider().getCacheSize() < numberOfPages);
+ // assertTrue(lookupCursorProvider().getCacheSize() < numberOfPages);
server.stop();
createServer();
@@ -271,9 +270,9 @@
PageCursorProvider cursorProvider = lookupCursorProvider();
PageSubscription cursor = this.server.getPagingManager()
- .getPageStore(ADDRESS)
- .getCursorProvier()
- .getSubscription(queue.getID());
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getSubscription(queue.getID());
PageCache firstPage = cursorProvider.getPageCache(new
PagePositionImpl(server.getPagingManager()
.getPageStore(ADDRESS)
@@ -285,7 +284,7 @@
System.out.println("Cursor: " + cursor);
cursorProvider.printDebug();
-
+
LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
for (int i = 0; i < 1000; i++)
@@ -301,13 +300,10 @@
}
}
cursorProvider.printDebug();
-
server.getStorageManager().waitOnOperations();
lookupPageStore(ADDRESS).flushExecutors();
-
-
// needs to clear the context since we are using the same thread over two distinct
servers
// otherwise we will get the old executor on the factory
OperationContextImpl.clearContext();
@@ -316,13 +312,10 @@
server.start();
- cursor = this.server.getPagingManager()
- .getPageStore(ADDRESS)
- .getCursorProvier()
- .getSubscription(queue.getID());
+ cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
iterator = cursor.iterator();
-
+
for (int i = firstPageSize; i < NUM_MESSAGES; i++)
{
System.out.println("Received " + i);
@@ -337,9 +330,9 @@
}
OperationContextImpl.getContext(null).waitCompletion();
-
+
lookupPageStore(ADDRESS).flushExecutors();
-
+
assertFalse(lookupPageStore(ADDRESS).isPaging());
server.stop();
@@ -362,15 +355,10 @@
PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
- // TODO: We should be using getPersisentCursor here but I can't change the
method here until createQueue is not
- // creating the cursor also
- // need to change this after some integration
- // PageCursor cursor =
- //
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
PageSubscription cursor = this.server.getPagingManager()
- .getPageStore(ADDRESS)
- .getCursorProvier()
- .getSubscription(queue.getID());
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getSubscription(queue.getID());
System.out.println("Cursor: " + cursor);
LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
@@ -390,10 +378,7 @@
server.start();
- cursor = this.server.getPagingManager()
- .getPageStore(ADDRESS)
- .getCursorProvier()
- .getSubscription(queue.getID());
+ cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
iterator = cursor.iterator();
for (int i = 10; i <= 20; i++)
@@ -428,15 +413,10 @@
PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
- // TODO: We should be using getPersisentCursor here but I can't change the
method here until createQueue is not
- // creating the cursor also
- // need to change this after some integration
- // PageCursor cursor =
- //
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
PageSubscription cursor = this.server.getPagingManager()
- .getPageStore(ADDRESS)
- .getCursorProvier()
- .getSubscription(queue.getID());
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getSubscription(queue.getID());
System.out.println("Cursor: " + cursor);
@@ -462,10 +442,7 @@
server.start();
- cursor = this.server.getPagingManager()
- .getPageStore(ADDRESS)
- .getCursorProvier()
- .getSubscription(queue.getID());
+ cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
iterator = cursor.iterator();
@@ -506,15 +483,10 @@
PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
- // TODO: We should be using getPersisentCursor here but I can't change the
method here until createQueue is not
- // creating the cursor also
- // need to change this after some integration
- // PageCursor cursor =
- //
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
PageSubscription cursor = this.server.getPagingManager()
- .getPageStore(ADDRESS)
- .getCursorProvier()
- .getSubscription(queue.getID());
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getSubscription(queue.getID());
System.out.println("Cursor: " + cursor);
@@ -522,8 +494,8 @@
for (int i = 0; i < NUM_MESSAGES; i++)
{
- //if (i % 100 == 0)
- System.out.println("read/written " + i);
+ // if (i % 100 == 0)
+ System.out.println("read/written " + i);
HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
@@ -551,10 +523,7 @@
pageStore = lookupPageStore(ADDRESS);
- cursor = this.server.getPagingManager()
- .getPageStore(ADDRESS)
- .getCursorProvier()
- .getSubscription(queue.getID());
+ cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
iterator = cursor.iterator();
for (int i = 0; i < NUM_MESSAGES * 2; i++)
@@ -590,10 +559,7 @@
pageStore = lookupPageStore(ADDRESS);
- cursor = this.server.getPagingManager()
- .getPageStore(ADDRESS)
- .getCursorProvier()
- .getSubscription(queue.getID());
+ cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
iterator = cursor.iterator();
for (int i = 0; i < NUM_MESSAGES * 3; i++)
@@ -622,26 +588,26 @@
assertEquals(i,
readMessage.b.getMessage().getIntProperty("key").intValue());
}
-
+
Pair<PagePosition, PagedMessage> readMessage = iterator.next();
-
+
assertEquals(NUM_MESSAGES * 3,
readMessage.b.getMessage().getIntProperty("key").intValue());
-
+
cursor.ack(readMessage.a);
-
+
server.getStorageManager().waitOnOperations();
pageStore.flushExecutors();
-
+
assertFalse(pageStore.isPaging());
server.stop();
createServer();
-
+
assertFalse(pageStore.isPaging());
waitCleanup();
-
+
assertFalse(lookupPageStore(ADDRESS).isPaging());
}
@@ -654,13 +620,14 @@
{
// The cleanup is done asynchronously, so we need to wait some time
long timeout = System.currentTimeMillis() + 10000;
-
+
while (System.currentTimeMillis() < timeout &&
lookupPageStore(ADDRESS).getNumberOfPages() != 1)
{
Thread.sleep(100);
}
- assertTrue("expected " + lookupPageStore(ADDRESS).getNumberOfPages(),
lookupPageStore(ADDRESS).getNumberOfPages() <= 2);
+ assertTrue("expected " + lookupPageStore(ADDRESS).getNumberOfPages(),
+ lookupPageStore(ADDRESS).getNumberOfPages() <= 2);
}
public void testPrepareScenarios() throws Exception
@@ -676,15 +643,10 @@
PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
- // TODO: We should be using getPersisentCursor here but I can't change the
method here until createQueue is not
- // creating the cursor also
- // need to change this after some integration
- // PageCursor cursor =
- //
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
PageSubscription cursor = this.server.getPagingManager()
- .getPageStore(ADDRESS)
- .getCursorProvier()
- .getSubscription(queue.getID());
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getSubscription(queue.getID());
LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
System.out.println("Cursor: " + cursor);
@@ -759,7 +721,7 @@
PageSubscription cursor = cursorProvider.createSubscription(11, null, false);
PageSubscriptionImpl cursor2 =
(PageSubscriptionImpl)cursorProvider.createSubscription(12, null, false);
-
+
queue.getPageSubscription().close();
Pair<PagePosition, PagedMessage> msg;
@@ -785,8 +747,7 @@
assertSame(cursor2.getProvider(), cursorProvider);
cursor2.close();
-
-
+
lookupPageStore(ADDRESS).flushExecutors();
server.stop();
@@ -796,18 +757,17 @@
}
-
- public void testNoCursors() throws Exception // aki
+ public void testNoCursors() throws Exception
{
final int NUM_MESSAGES = 100;
int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
+
ClientSessionFactory sf = createInVMFactory();
ClientSession session = sf.createSession();
session.deleteQueue(ADDRESS);
-
+
System.out.println("NumberOfPages = " + numberOfPages);
server.stop();
@@ -831,9 +791,9 @@
PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
PageSubscription cursor = cursorProvider.createSubscription(2, null, false);
-
+
queue.getPageSubscription().close();
-
+
PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() /
2);
cursor.bookmark(startingPos);
PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
@@ -855,7 +815,7 @@
forceGC();
- // assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+ // assertTrue(cursorProvider.getCacheSize() < numberOfPages);
server.stop();
createServer();
@@ -876,11 +836,6 @@
PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
- // TODO: We should be using getPersisentCursor here but I can't change the
method here until createQueue is not
- // creating the cursor also
- // need to change this after some integration
- // PageCursor cursor =
- //
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() /
2);
cursor.bookmark(startingPos);
@@ -928,7 +883,54 @@
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
+
+ private int tstProperty(ServerMessage msg)
+ {
+ return msg.getIntProperty("key").intValue();
+ }
+ public void testMultipleIterators() throws Exception
+ {
+
+ final int NUM_MESSAGES = 10;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider = lookupCursorProvider();
+
+ PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
+
+ Iterator<Pair<PagePosition, PagedMessage>> iter = cursor.iterator();
+
+ Iterator<Pair<PagePosition, PagedMessage>> iter2 = cursor.iterator();
+
+ assertTrue(iter.hasNext());
+
+ Pair<PagePosition, PagedMessage> msg1 = iter.next();
+
+ Pair<PagePosition, PagedMessage> msg2 = iter2.next();
+
+ assertEquals(tstProperty(msg1.b.getMessage()), tstProperty(msg2.b.getMessage()));
+
+ System.out.println("property = " + tstProperty(msg1.b.getMessage()));
+
+ msg1 = iter.next();
+
+ assertEquals(1, tstProperty(msg1.b.getMessage()));
+
+ iter.remove();
+
+ msg2 = iter2.next();
+
+ assertEquals(2, tstProperty(msg2.b.getMessage()));
+
+ assertTrue(iter2.hasNext());
+
+
+ }
+
private int addMessages(final int numMessages, final int messageSize) throws
Exception
{
return addMessages(0, numMessages, messageSize);
@@ -1014,15 +1016,6 @@
* @return
* @throws Exception
*/
- private PageSubscription createNonPersistentCursor() throws Exception
- {
- return
lookupCursorProvider().createSubscription(server.getStorageManager().generateUniqueID(),
null, false);
- }
-
- /**
- * @return
- * @throws Exception
- */
private PageSubscription createNonPersistentCursor(Filter filter) throws Exception
{
return
lookupCursorProvider().createSubscription(server.getStorageManager().generateUniqueID(),
filter, false);
@@ -1070,6 +1063,8 @@
protected void tearDown() throws Exception
{
server.stop();
+ server = null;
+ queue = null;
super.tearDown();
}