Author: clebert.suconic(a)jboss.com
Date: 2010-11-12 14:37:52 -0500 (Fri, 12 Nov 2010)
New Revision: 9882
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
tweak on ordering
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-12
16:56:31 UTC (rev 9881)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-12
19:37:52 UTC (rev 9882)
@@ -296,7 +296,7 @@
PagePosition retPos = pos.nextMessage();
PageCache cache = cursorProvider.getPageCache(pos);
-
+
if (cache == null)
{
return null;
@@ -1014,40 +1014,19 @@
try
{
- synchronized (redeliveries)
- {
- if (redeliveryIterator.hasNext())
- {
- // There's a redelivery pending, we will get it out of that pool
instead
- isredelivery = true;
- return getReference(redeliveryIterator.next());
- }
- else
- {
- isredelivery = false;
- }
- }
-
if (position == null)
{
position = getStartPosition();
}
- PagePosition previousPos = position;
- PagedReference nextPos = moveNext();
- if (nextPos != null)
- {
- lastOperation = previousPos;
- position = nextPos.getPosition();
- }
- return nextPos;
+ return moveNext();
}
catch (Exception e)
{
throw new RuntimeException(e.getMessage(), e);
}
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
*/
@@ -1057,18 +1036,32 @@
PagedReference message = null;
+ PagePosition lastPosition = position;
PagePosition tmpPosition = position;
do
{
- message = internalGetNext(tmpPosition);
-
+ synchronized (redeliveries)
+ {
+ if (redeliveryIterator.hasNext())
+ {
+ // There's a redelivery pending, we will get it out of that pool
instead
+ isredelivery = true;
+ return getReference(redeliveryIterator.next());
+ }
+ else
+ {
+ isredelivery = false;
+ }
+ message = internalGetNext(tmpPosition);
+ }
+
if (message == null)
{
break;
}
-
+
tmpPosition = message.getPosition();
boolean valid = true;
@@ -1079,13 +1072,14 @@
// 1st... is it routed?
valid = routed(message.getPagedMessage());
- if (!valid) ignored = true;
+ if (!valid)
+ ignored = true;
// 2nd ... if TX, is it committed?
if (valid && message.getPagedMessage().getTransactionID() != 0)
{
PageTransactionInfo tx =
pageStore.getPagingManager().getTransaction(message.getPagedMessage()
-
.getTransactionID());
+
.getTransactionID());
if (tx == null)
{
log.warn("Couldn't locate page transaction " +
message.getPagedMessage().getTransactionID() +
@@ -1108,7 +1102,8 @@
if (valid)
{
// We don't create a PageCursorInfo unless we are doing a write
operation (ack or removing)
- // Say you have a Browser that will only read the files... there's no
need to control PageCursors is nothing
+ // Say you have a Browser that will only read the files... there's no
need to control PageCursors is
+ // nothing
// is being changed. That's why the false is passed as a parameter
here
PageCursorInfo info = getPageInfo(message.getPosition(), false);
if (info != null && info.isRemoved(message.getPosition()))
@@ -1116,7 +1111,7 @@
valid = false;
}
}
-
+
if (!ignored)
{
position = message.getPosition();
@@ -1138,10 +1133,14 @@
}
while (message != null && !match);
+ if (message != null)
+ {
+ lastOperation = lastPosition;
+ }
+
return message;
}
-
/** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be
using next and hasNext as well.
* It would be a rare race condition but I would prefer avoiding that scenario */
public synchronized boolean hasNext()
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-12
16:56:31 UTC (rev 9881)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-12
19:37:52 UTC (rev 9882)
@@ -144,7 +144,6 @@
public synchronized void commit()
{
- committed = true;
if (lateDeliveries != null)
{
for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
@@ -153,6 +152,7 @@
}
lateDeliveries.clear();
}
+ committed = true;
lateDeliveries = null;
}
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-12
16:56:31 UTC (rev 9881)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-12
19:37:52 UTC (rev 9882)
@@ -922,7 +922,7 @@
msg.putIntProperty("count", i);
producer.send(msg);
- if (i % 50 == 0 && i != 0)
+ if (i % 100 == 0 && i != 0)
{
sessionProducer.commit();
// Thread.sleep(500);
@@ -967,16 +967,16 @@
for (int i = 0; i < numberOfMessages; i++)
{
- ClientMessage msg = consumer.receive(500000);
+ ClientMessage msg = consumer.receive(5000);
assertNotNull(msg);
assertEquals(i, msg.getIntProperty("count").intValue());
msg.acknowledge();
if (i > 0 && i % 10 == 0)
{
- // session.commit();
+ session.commit();
}
}
- // session.commit();
+ session.commit();
session.close();