Author: clebert.suconic(a)jboss.com
Date: 2010-11-12 21:43:47 -0500 (Fri, 12 Nov 2010)
New Revision: 9884
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
tweaks
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-12
20:24:00 UTC (rev 9883)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-13
02:43:47 UTC (rev 9884)
@@ -209,8 +209,8 @@
{
if (entry.getKey() == lastAckedPosition.getPageNr())
{
- PageSubscriptionImpl.trace("We can't clear page " +
entry.getKey() +
- " now since it's the current
page");
+ // PageSubscriptionImpl.trace("We can't clear page " +
entry.getKey() +
+ // " now since it's the current page");
}
else
{
@@ -1030,115 +1030,120 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
*/
- public synchronized PagedReference moveNext() throws Exception
+ public PagedReference moveNext() throws Exception
{
- boolean match = false;
+ synchronized (PageSubscriptionImpl.this)
+ {
+ boolean match = false;
- PagedReference message = null;
+ PagedReference message = null;
- PagePosition lastPosition = position;
- PagePosition tmpPosition = position;
+ PagePosition lastPosition = position;
+ PagePosition tmpPosition = position;
- do
- {
- synchronized (redeliveries)
+ do
{
- if (redeliveryIterator.hasNext())
+ synchronized (redeliveries)
{
- // There's a redelivery pending, we will get it out of that pool
instead
- isredelivery = true;
- return getReference(redeliveryIterator.next());
+ if (redeliveryIterator.hasNext())
+ {
+ // There's a redelivery pending, we will get it out of that pool
instead
+ isredelivery = true;
+ PagedReference redeliveredMsg =
getReference(redeliveryIterator.next());
+
+ return redeliveredMsg;
+ }
+ else
+ {
+ isredelivery = false;
+ }
+
+ message = internalGetNext(tmpPosition);
}
- else
+
+ if (message == null)
{
- isredelivery = false;
+ break;
}
- message = internalGetNext(tmpPosition);
- }
+ tmpPosition = message.getPosition();
- if (message == null)
- {
- break;
- }
+ boolean valid = true;
+ boolean ignored = false;
- tmpPosition = message.getPosition();
+ // Validate the scenarios where the message should be considered not valid
even to be considered
- boolean valid = true;
- boolean ignored = false;
+ // 1st... is it routed?
- // Validate the scenarios where the message should be considered not valid
even to be considered
+ valid = routed(message.getPagedMessage());
+ if (!valid)
+ ignored = true;
- // 1st... is it routed?
-
- valid = routed(message.getPagedMessage());
- if (!valid)
- ignored = true;
-
- // 2nd ... if TX, is it committed?
- if (valid && message.getPagedMessage().getTransactionID() != 0)
- {
- PageTransactionInfo tx =
pageStore.getPagingManager().getTransaction(message.getPagedMessage()
-
.getTransactionID());
- if (tx == null)
+ // 2nd ... if TX, is it committed?
+ if (valid && message.getPagedMessage().getTransactionID() != 0)
{
- log.warn("Couldn't locate page transaction " +
message.getPagedMessage().getTransactionID() +
- ", ignoring message on position " +
- message.getPosition());
- valid = false;
- ignored = true;
+ PageTransactionInfo tx =
pageStore.getPagingManager().getTransaction(message.getPagedMessage()
+
.getTransactionID());
+ if (tx == null)
+ {
+ log.warn("Couldn't locate page transaction " +
message.getPagedMessage().getTransactionID() +
+ ", ignoring message on position " +
+ message.getPosition());
+ valid = false;
+ ignored = true;
+ }
+ else
+ {
+ if (tx.deliverAfterCommit(PageSubscriptionImpl.this,
message.getPosition()))
+ {
+ valid = false;
+ ignored = false;
+ }
+ }
}
- else
+
+ // 3rd... was it previously removed?
+ if (valid)
{
- if (tx.deliverAfterCommit(PageSubscriptionImpl.this,
message.getPosition()))
+ // We don't create a PageCursorInfo unless we are doing a write
operation (ack or removing)
+ // Say you have a Browser that will only read the files... there's
no need to control PageCursors is
+ // nothing
+ // is being changed. That's why the false is passed as a parameter
here
+ PageCursorInfo info = getPageInfo(message.getPosition(), false);
+ if (info != null && info.isRemoved(message.getPosition()))
{
valid = false;
- ignored = false;
}
}
- }
- // 3rd... was it previously removed?
- if (valid)
- {
- // We don't create a PageCursorInfo unless we are doing a write
operation (ack or removing)
- // Say you have a Browser that will only read the files... there's no
need to control PageCursors is
- // nothing
- // is being changed. That's why the false is passed as a parameter
here
- PageCursorInfo info = getPageInfo(message.getPosition(), false);
- if (info != null && info.isRemoved(message.getPosition()))
+ if (!ignored)
{
- valid = false;
+ position = message.getPosition();
}
- }
- if (!ignored)
- {
- position = message.getPosition();
- }
+ if (valid)
+ {
+ match = match(message.getMessage());
- if (valid)
- {
- match = match(message.getMessage());
-
- if (!match)
+ if (!match)
+ {
+ processACK(message.getPosition());
+ }
+ }
+ else if (ignored)
{
- processACK(message.getPosition());
+ positionIgnored(message.getPosition());
}
}
- else if (ignored)
+ while (message != null && !match);
+
+ if (message != null)
{
- positionIgnored(message.getPosition());
+ lastOperation = lastPosition;
}
- }
- while (message != null && !match);
- if (message != null)
- {
- lastOperation = lastPosition;
+ return message;
}
-
- return message;
}
/** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be
using next and hasNext as well.
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-12
20:24:00 UTC (rev 9883)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-13
02:43:47 UTC (rev 9884)
@@ -51,6 +51,8 @@
private volatile long recordID = -1;
private volatile boolean committed = false;
+
+ private volatile boolean useRedelivery = false;
private volatile boolean rolledback = false;
@@ -237,6 +239,12 @@
*/
public synchronized boolean deliverAfterCommit(PageSubscription cursor, PagePosition
cursorPos)
{
+ if (committed && useRedelivery)
+ {
+ cursor.redeliver(cursorPos);
+ return true;
+ }
+ else
if (committed)
{
return false;
@@ -249,6 +257,7 @@
}
else
{
+ useRedelivery = true;
if (lateDeliveries == null)
{
lateDeliveries = new LinkedList<Pair<PageSubscription,
PagePosition>>();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-12
20:24:00 UTC (rev 9883)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-13
02:43:47 UTC (rev 9884)
@@ -1273,7 +1273,7 @@
if (msgsToDeliver > 0)
{
//System.out.println("Depaging " + msgsToDeliver + "
messages");
- System.out.println("Depage " + msgsToDeliver + " now.. there are
msgRef = " + messageReferences.size() + " scheduled = " +
getScheduledCount() + " concurrentQueue.size() = " + concurrentQueue.size());
+ //System.out.println("Depage " + msgsToDeliver + " now.. there
are msgRef = " + messageReferences.size() + " scheduled = " +
getScheduledCount() + " concurrentQueue.size() = " + concurrentQueue.size());
int nmessages = 0;
while (nmessages < msgsToDeliver && pageIterator.hasNext())
@@ -1283,12 +1283,12 @@
pageIterator.remove();
}
- System.out.println("Depaged " + nmessages);
+ //System.out.println("Depaged " + nmessages);
}
- else
- {
- System.out.println("Depaging not being done now.. there are msgRef = "
+ messageReferences.size() + " scheduled = " + getScheduledCount() + "
concurrentQueue.size() = " + concurrentQueue.size());
- }
+// else
+// {
+// System.out.println("Depaging not being done now.. there are msgRef =
" + messageReferences.size() + " scheduled = " + getScheduledCount() +
" concurrentQueue.size() = " + concurrentQueue.size());
+// }
deliverAsync();
}