Author: clebert.suconic(a)jboss.com
Date: 2010-11-04 18:47:33 -0400 (Thu, 04 Nov 2010)
New Revision: 9846
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
just a backup
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-11-04
15:05:08 UTC (rev 9845)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-11-04
22:47:33 UTC (rev 9846)
@@ -37,8 +37,11 @@
long getId();
boolean isPersistent();
+
+ /** Used as a delegate method to pageStore.isPaging() */
+ boolean isPaging();
- public LinkedListIterator<PagedReferenceImpl> iterator();
+ public LinkedListIterator<PagedReference> iterator();
// To be called when the cursor is closed for good. Most likely when the queue is
deleted
void close() throws Exception;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-04
15:05:08 UTC (rev 9845)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-04
22:47:33 UTC (rev 9846)
@@ -129,6 +129,11 @@
return queue;
}
+ public boolean isPaging()
+ {
+ return pageStore.isPaging();
+ }
+
public void setQueue(Queue queue)
{
this.queue = queue;
@@ -168,7 +173,7 @@
ack(position);
}
- class CursorIterator implements LinkedListIterator<PagedReferenceImpl>
+ class CursorIterator implements LinkedListIterator<PagedReference>
{
PagePosition position = getLastPosition();
@@ -204,7 +209,7 @@
/* (non-Javadoc)
* @see java.util.Iterator#next()
*/
- public PagedReferenceImpl next()
+ public synchronized PagedReferenceImpl next()
{
if (cachedNext != null)
@@ -239,7 +244,9 @@
}
}
- public boolean hasNext()
+ /** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be
using next and hasNext as well.
+ * It would be a rare race condition but I would prefer avoiding that scenario */
+ public synchronized boolean hasNext()
{
// if an unbehaved program called hasNext twice before next, we only cache it
once.
if (cachedNext != null)
@@ -247,6 +254,11 @@
return true;
}
+ if (!pageStore.isPaging())
+ {
+ return false;
+ }
+
cachedNext = next();
return cachedNext != null;
@@ -276,7 +288,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#iterator()
*/
- public LinkedListIterator<PagedReferenceImpl> iterator()
+ public LinkedListIterator<PagedReference> iterator()
{
return new CursorIterator();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-04
15:05:08 UTC (rev 9845)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-04
22:47:33 UTC (rev 9846)
@@ -33,6 +33,7 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
@@ -92,6 +93,8 @@
private final PostOffice postOffice;
private final PageSubscription pageSubscription;
+
+ private final LinkedListIterator<PagedReference> pageIterator;
private final ConcurrentLinkedQueue<MessageReference> concurrentQueue = new
ConcurrentLinkedQueue<MessageReference>();
@@ -109,6 +112,8 @@
private final Runnable deliverRunner = new DeliverRunner();
+ private final Runnable depageRunner = new DepageRunner();
+
private final StorageManager storageManager;
private final HierarchicalRepository<AddressSettings>
addressSettingsRepository;
@@ -221,7 +226,12 @@
if (pageSubscription != null)
{
pageSubscription.setQueue(this);
+ this.pageIterator = pageSubscription.iterator();
}
+ else
+ {
+ this.pageIterator = null;
+ }
this.executor = executor;
@@ -339,7 +349,7 @@
// We don't recompute it on every delivery since executing isEmpty is expensive
for a ConcurrentQueue
if (checkDirect)
{
- if (direct && !directDeliver && concurrentQueue.isEmpty()
&& messageReferences.isEmpty())
+ if (direct && !directDeliver && concurrentQueue.isEmpty()
&& messageReferences.isEmpty() && !pageIterator.hasNext() &&
!pageSubscription.isPaging())
{
// We must block on the executor to ensure any async deliveries have
completed or we might get out of order
// deliveries
@@ -1225,7 +1235,30 @@
pos = 0;
}
}
+
+ if (messageReferences.size() == 0 && pageIterator.hasNext())
+ {
+ scheduleDepage();
+ }
}
+
+ private void scheduleDepage()
+ {
+ executor.execute(depageRunner);
+ }
+
+ private void depage()
+ {
+ int nmessages = 0;
+ while (nmessages < MAX_DELIVERIES_IN_LOOP && pageIterator.hasNext())
+ {
+ nmessages ++;
+ addTail(pageIterator.next(), false);
+ pageIterator.remove();
+ }
+
+ deliverAsync();
+ }
private void internalAddRedistributor(final Executor executor)
{
@@ -1716,6 +1749,21 @@
}
}
+ private class DepageRunner implements Runnable
+ {
+ public void run()
+ {
+ try
+ {
+ depage();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to deliver", e);
+ }
+ }
+ }
+
private class ConcurrentPoller implements Runnable
{
public void run()
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-04
15:05:08 UTC (rev 9845)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-04
22:47:33 UTC (rev 9846)
@@ -15,7 +15,6 @@
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import junit.framework.Assert;
@@ -32,7 +31,7 @@
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.PageSubscription;
-import org.hornetq.core.paging.cursor.PagedReferenceImpl;
+import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl;
@@ -120,9 +119,9 @@
PageSubscription cursor =
lookupPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
- PagedReferenceImpl msg;
+ PagedReference msg;
- LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
int key = 0;
while ((msg = iterator.next()) != null)
{
@@ -205,11 +204,11 @@
queue.getPageSubscription().close();
- PagedReferenceImpl msg;
+ PagedReference msg;
- LinkedListIterator<PagedReferenceImpl> iteratorEven = cursorEven.iterator();
+ LinkedListIterator<PagedReference> iteratorEven = cursorEven.iterator();
- LinkedListIterator<PagedReferenceImpl> iteratorOdd = cursorOdd.iterator();
+ LinkedListIterator<PagedReference> iteratorOdd = cursorOdd.iterator();
int key = 0;
while ((msg = iteratorEven.next()) != null)
@@ -285,12 +284,12 @@
System.out.println("Cursor: " + cursor);
cursorProvider.printDebug();
- LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
for (int i = 0; i < 1000; i++)
{
System.out.println("Reading Msg : " + i);
- PagedReferenceImpl msg = iterator.next();
+ PagedReference msg = iterator.next();
assertNotNull(msg);
assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
@@ -319,7 +318,7 @@
for (int i = firstPageSize; i < NUM_MESSAGES; i++)
{
System.out.println("Received " + i);
- PagedReferenceImpl msg = iterator.next();
+ PagedReference msg = iterator.next();
assertNotNull(msg);
assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
@@ -361,10 +360,10 @@
.getSubscription(queue.getID());
System.out.println("Cursor: " + cursor);
- LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
for (int i = 0; i < 100; i++)
{
- PagedReferenceImpl msg = iterator.next();
+ PagedReference msg = iterator.next();
assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
if (i < 10 || i > 20)
{
@@ -383,14 +382,14 @@
for (int i = 10; i <= 20; i++)
{
- PagedReferenceImpl msg = iterator.next();
+ PagedReference msg = iterator.next();
assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
cursor.ack(msg);
}
for (int i = 100; i < NUM_MESSAGES; i++)
{
- PagedReferenceImpl msg = iterator.next();
+ PagedReference msg = iterator.next();
assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
cursor.ack(msg);
}
@@ -422,11 +421,11 @@
Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
- LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
for (int i = 0; i < 100; i++)
{
- PagedReferenceImpl msg = iterator.next();
+ PagedReference msg = iterator.next();
assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
if (i < 10 || i > 20)
{
@@ -449,14 +448,14 @@
for (int i = 10; i <= 20; i++)
{
- PagedReferenceImpl msg = iterator.next();
+ PagedReference msg = iterator.next();
assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
cursor.ackTx(tx, msg);
}
for (int i = 100; i < NUM_MESSAGES; i++)
{
- PagedReferenceImpl msg = iterator.next();
+ PagedReference msg = iterator.next();
assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
cursor.ackTx(tx, msg);
}
@@ -490,7 +489,7 @@
System.out.println("Cursor: " + cursor);
- LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
for (int i = 0; i < NUM_MESSAGES; i++)
{
@@ -506,7 +505,7 @@
Assert.assertTrue(pageStore.page(msg));
- PagedReferenceImpl readMessage = iterator.next();
+ PagedReference readMessage = iterator.next();
assertNotNull(readMessage);
@@ -544,7 +543,7 @@
Assert.assertTrue(pageStore.page(msg));
}
- PagedReferenceImpl readMessage = iterator.next();
+ PagedReference readMessage = iterator.next();
assertNotNull(readMessage);
@@ -580,7 +579,7 @@
Assert.assertTrue(pageStore.page(msg));
}
- PagedReferenceImpl readMessage = iterator.next();
+ PagedReference readMessage = iterator.next();
assertNotNull(readMessage);
@@ -589,7 +588,7 @@
assertEquals(i,
readMessage.getMessage().getIntProperty("key").intValue());
}
- PagedReferenceImpl readMessage = iterator.next();
+ PagedReference readMessage = iterator.next();
assertEquals(NUM_MESSAGES * 3,
readMessage.getMessage().getIntProperty("key").intValue());
@@ -647,7 +646,7 @@
.getPageStore(ADDRESS)
.getCursorProvier()
.getSubscription(queue.getID());
- LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
System.out.println("Cursor: " + cursor);
@@ -676,7 +675,7 @@
// First consume what's already there without any tx as nothing was committed
for (int i = 300; i < 400; i++)
{
- PagedReferenceImpl pos = iterator.next();
+ PagedReference pos = iterator.next();
assertNotNull("Null at position " + i, pos);
assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
cursor.ack(pos);
@@ -693,7 +692,7 @@
// Second:after pgtxCommit was done
for (int i = 200; i < 300; i++)
{
- PagedReferenceImpl pos = iterator.next();
+ PagedReference pos = iterator.next();
assertNotNull(pos);
assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
cursor.ack(pos);
@@ -724,9 +723,9 @@
queue.getPageSubscription().close();
- PagedReferenceImpl msg;
- LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
- LinkedListIterator<PagedReferenceImpl> iterator2 = cursor.iterator();
+ PagedReference msg;
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
+ LinkedListIterator<PagedReference> iterator2 = cursor.iterator();
int key = 0;
while ((msg = iterator.next()) != null)
@@ -803,9 +802,9 @@
msg = null;
cache = null;
- LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
- PagedReferenceImpl msgCursor = null;
+ PagedReference msgCursor = null;
while ((msgCursor = iterator.next()) != null)
{
assertEquals(key++,
msgCursor.getMessage().getIntProperty("key").intValue());
@@ -848,9 +847,9 @@
cache = null;
- LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
- PagedReferenceImpl msgCursor = null;
+ PagedReference msgCursor = null;
while ((msgCursor = iterator.next()) != null)
{
assertEquals(key++,
msgCursor.getMessage().getIntProperty("key").intValue());
@@ -902,15 +901,15 @@
PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
- LinkedListIterator<PagedReferenceImpl> iter = cursor.iterator();
+ LinkedListIterator<PagedReference> iter = cursor.iterator();
- LinkedListIterator<PagedReferenceImpl> iter2 = cursor.iterator();
+ LinkedListIterator<PagedReference> iter2 = cursor.iterator();
assertTrue(iter.hasNext());
- PagedReferenceImpl msg1 = iter.next();
+ PagedReference msg1 = iter.next();
- PagedReferenceImpl msg2 = iter2.next();
+ PagedReference msg2 = iter2.next();
assertEquals(tstProperty(msg1.getMessage()), tstProperty(msg2.getMessage()));