Author: clebert.suconic(a)jboss.com
Date: 2010-11-12 11:56:31 -0500 (Fri, 12 Nov 2010)
New Revision: 9881
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
Log:
fix TX Ordering on paging
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-12
06:51:54 UTC (rev 9880)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-12
16:56:31 UTC (rev 9881)
@@ -291,94 +291,6 @@
return new CursorIterator();
}
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
- */
- public synchronized PagedReference moveNext(PagePosition position) throws Exception
- {
- boolean match = false;
-
- PagedReference message = null;
-
- PagePosition tmpPosition = position;
-
- do
- {
- message = internalGetNext(tmpPosition);
-
-
- if (message == null)
- {
- break;
- }
-
- tmpPosition = message.getPosition();
-
- boolean valid = true;
- boolean ignored = false;
-
- // Validate the scenarios where the message should be considered not valid even
to be considered
-
- // 1st... is it routed?
-
- valid = routed(message.getPagedMessage());
- if (!valid) ignored = true;
-
- // 2nd ... if TX, is it committed?
- if (valid && message.getPagedMessage().getTransactionID() != 0)
- {
- PageTransactionInfo tx =
pageStore.getPagingManager().getTransaction(message.getPagedMessage()
-
.getTransactionID());
- if (tx == null)
- {
- log.warn("Couldn't locate page transaction " +
message.getPagedMessage().getTransactionID() +
- ", ignoring message on position " +
- message.getPosition());
- valid = false;
- ignored = true;
- }
- else
- {
- if (tx.deliverAfterCommit(this, message.getPosition()))
- {
- valid = false;
- ignored = false;
- }
- }
- }
-
- // 3rd... was it previously removed?
- if (valid)
- {
- // We don't create a PageCursorInfo unless we are doing a write operation
(ack or removing)
- // Say you have a Browser that will only read the files... there's no
need to control PageCursors is nothing
- // is being changed. That's why the false is passed as a parameter here
- PageCursorInfo info = getPageInfo(message.getPosition(), false);
- if (info != null && info.isRemoved(message.getPosition()))
- {
- valid = false;
- }
- }
-
- if (valid)
- {
- match = match(message.getMessage());
-
- if (!match)
- {
- processACK(message.getPosition());
- }
- }
- else if (ignored)
- {
- positionIgnored(message.getPosition());
- }
- }
- while (message != null && !match);
-
- return message;
- }
-
private PagedReference internalGetNext(final PagePosition pos)
{
PagePosition retPos = pos.nextMessage();
@@ -1121,10 +1033,11 @@
position = getStartPosition();
}
- PagedReference nextPos = moveNext(position);
+ PagePosition previousPos = position;
+ PagedReference nextPos = moveNext();
if (nextPos != null)
{
- lastOperation = position;
+ lastOperation = previousPos;
position = nextPos.getPosition();
}
return nextPos;
@@ -1134,7 +1047,101 @@
throw new RuntimeException(e.getMessage(), e);
}
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
+ */
+ public synchronized PagedReference moveNext() throws Exception
+ {
+ boolean match = false;
+ PagedReference message = null;
+
+ PagePosition tmpPosition = position;
+
+ do
+ {
+ message = internalGetNext(tmpPosition);
+
+
+ if (message == null)
+ {
+ break;
+ }
+
+ tmpPosition = message.getPosition();
+
+ boolean valid = true;
+ boolean ignored = false;
+
+ // Validate the scenarios where the message should be considered not valid
even to be considered
+
+ // 1st... is it routed?
+
+ valid = routed(message.getPagedMessage());
+ if (!valid) ignored = true;
+
+ // 2nd ... if TX, is it committed?
+ if (valid && message.getPagedMessage().getTransactionID() != 0)
+ {
+ PageTransactionInfo tx =
pageStore.getPagingManager().getTransaction(message.getPagedMessage()
+
.getTransactionID());
+ if (tx == null)
+ {
+ log.warn("Couldn't locate page transaction " +
message.getPagedMessage().getTransactionID() +
+ ", ignoring message on position " +
+ message.getPosition());
+ valid = false;
+ ignored = true;
+ }
+ else
+ {
+ if (tx.deliverAfterCommit(PageSubscriptionImpl.this,
message.getPosition()))
+ {
+ valid = false;
+ ignored = false;
+ }
+ }
+ }
+
+ // 3rd... was it previously removed?
+ if (valid)
+ {
+ // We don't create a PageCursorInfo unless we are doing a write
operation (ack or removing)
+ // Say you have a Browser that will only read the files... there's no
need to control PageCursors is nothing
+ // is being changed. That's why the false is passed as a parameter
here
+ PageCursorInfo info = getPageInfo(message.getPosition(), false);
+ if (info != null && info.isRemoved(message.getPosition()))
+ {
+ valid = false;
+ }
+ }
+
+ if (!ignored)
+ {
+ position = message.getPosition();
+ }
+
+ if (valid)
+ {
+ match = match(message.getMessage());
+
+ if (!match)
+ {
+ processACK(message.getPosition());
+ }
+ }
+ else if (ignored)
+ {
+ positionIgnored(message.getPosition());
+ }
+ }
+ while (message != null && !match);
+
+ return message;
+ }
+
+
/** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be
using next and hasNext as well.
* It would be a rare race condition but I would prefer avoiding that scenario */
public synchronized boolean hasNext()
Show replies by date