Author: clebert.suconic(a)jboss.com
Date: 2010-11-12 15:24:00 -0500 (Fri, 12 Nov 2010)
New Revision: 9883
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
adding new test
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-12
19:37:52 UTC (rev 9882)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-12
20:24:00 UTC (rev 9883)
@@ -17,6 +17,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
@@ -621,6 +622,116 @@
}
+
+ public void testConsumeLivePageMultiThread() throws Exception
+ {
+ final PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+ pageStore.startPaging();
+
+ final int NUM_TX = 100;
+
+ final int MSGS_TX = 100;
+
+ final int TOTAL_MSG = NUM_TX * MSGS_TX;
+
+ final int messageSize = 1024;
+
+ PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ System.out.println("cursorProvider = " + cursorProvider);
+
+ PageSubscription cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getSubscription(queue.getID());
+
+ System.out.println("Cursor: " + cursor);
+
+ final StorageManager storage = this.server.getStorageManager();
+
+ final AtomicInteger exceptions = new AtomicInteger(0);
+
+ Thread t1 = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ int count = 0;
+
+ for (int txCount = 0; txCount < NUM_TX; txCount++)
+ {
+
+ Transaction tx = null;
+
+ if (txCount % 2 == 0)
+ {
+ tx = new TransactionImpl(storage);
+ }
+
+ RoutingContext ctx = generateCTX(tx);
+
+ for (int i = 0 ; i < MSGS_TX; i++)
+ {
+ //System.out.println("Sending " + count);
+ HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, count);
+
+ ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+ msg.putIntProperty("key", count++);
+
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+ Assert.assertTrue(pageStore.page(msg, ctx,
ctx.getContextListing(ADDRESS)));
+ }
+
+ if (tx != null)
+ {
+ tx.commit();
+ }
+
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ exceptions.incrementAndGet();
+ }
+ }
+ };
+
+ t1.start();
+
+
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+ for (int i = 0 ; i < TOTAL_MSG; i++ )
+ {
+ assertEquals(0, exceptions.get());
+ PagedReference ref = null;
+ for (int repeat = 0 ; repeat < 5; repeat++)
+ {
+ ref = iterator.next();
+ if (ref == null)
+ {
+ Thread.sleep(1000);
+ }
+ else
+ {
+ break;
+ }
+ }
+ assertNotNull(ref);
+
+ ref.acknowledge();
+ assertNotNull(ref);
+
+ System.out.println("Consuming " +
ref.getMessage().getIntProperty("key"));
+ //assertEquals(i, ref.getMessage().getIntProperty("key").intValue());
+ }
+
+ assertEquals(0, exceptions.get());
+ }
+
private RoutingContextImpl generateCTX()
{
return generateCTX(null);