Author: clebert.suconic(a)jboss.com
Date: 2011-02-01 21:21:13 -0500 (Tue, 01 Feb 2011)
New Revision: 10168
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Fixing Tests and Page Counters
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-02-01
17:02:29 UTC (rev 10167)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-02-02
02:21:13 UTC (rev 10168)
@@ -96,6 +96,8 @@
void processReload() throws Exception;
+ void addPendingDelivery(final PagePosition position);
+
/**
* To be used on redeliveries
* @param position
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-02-01
17:02:29 UTC (rev 10167)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-02-02
02:21:13 UTC (rev 10168)
@@ -121,7 +121,7 @@
if (pos.getMessageNr() >= cache.getNumberOfMessages())
{
// sanity check, this should never happen unless there's a bug
- throw new IllegalStateException("Invalid messageNumber passed = " +
pos);
+ throw new IllegalStateException("Invalid messageNumber passed = " +
pos + " on " + cache);
}
return cache.getMessage(pos.getMessageNr());
@@ -255,7 +255,7 @@
cursorList.addAll(activeCursors.values());
long minPage = checkMinPage(cursorList);
-
+
if (minPage == pagingStore.getCurrentWritingPage() &&
pagingStore.getCurrentPage().getNumberOfMessages() > 0)
{
boolean complete = true;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-02-01
17:02:29 UTC (rev 10167)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-02-02
02:21:13 UTC (rev 10168)
@@ -73,8 +73,7 @@
private static void trace(final String message)
{
- // PageCursorImpl.log.info(message);
- System.out.println(message);
+ PageSubscriptionImpl.log.info(message);
}
private volatile boolean autoCleanup = true;
@@ -102,6 +101,8 @@
private final SortedMap<Long, PageCursorInfo> consumedPages =
Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
private final PageSubscriptionCounter counter;
+
+ private final AtomicLong deliveredCount = new AtomicLong(0);
// We only store the position for redeliveries. They will be read from the SoftCache
again during delivery.
private final ConcurrentLinkedQueue<PagePosition> redeliveries = new
ConcurrentLinkedQueue<PagePosition>();
@@ -176,7 +177,7 @@
public long getMessageCount()
{
- return counter.getValue();
+ return counter.getValue() - deliveredCount.get();
}
public PageSubscriptionCounter getCounter()
@@ -272,7 +273,7 @@
synchronized (PageSubscriptionImpl.this)
{
for (PageCursorInfo completePage : completedPages)
- {
+ {
if (isTrace)
{
PageSubscriptionImpl.trace("Removing page " +
completePage.getPageId());
@@ -474,6 +475,11 @@
return consumedPages.firstKey();
}
}
+
+ public void addPendingDelivery(final PagePosition position)
+ {
+ getPageInfo(position).incrementPendingTX();
+ }
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
@@ -483,6 +489,15 @@
synchronized (redeliveries)
{
redeliveries.add(position);
+ PageCursorInfo pageInfo = consumedPages.get(position.getPageNr());
+ if (pageInfo != null)
+ {
+ pageInfo.decrementPendingTX();
+ }
+ else
+ {
+ // this shouldn't really happen.
+ }
}
}
@@ -823,6 +838,9 @@
// The page was live at the time of the creation
private final boolean wasLive;
+
+ // There's a pending TX to add elements on this page
+ private AtomicInteger pendingTX = new AtomicInteger(0);
// There's a pending delete on the async IO pipe
// We're holding this object to avoid delete the pages before the IO is
complete,
@@ -856,7 +874,7 @@
public boolean isDone()
{
- return getNumberOfMessages() == confirmed.get();
+ return getNumberOfMessages() == confirmed.get() && pendingTX.get() ==
0;
}
public boolean isPendingDelete()
@@ -876,6 +894,16 @@
{
return pageId;
}
+
+ public void incrementPendingTX()
+ {
+ pendingTX.incrementAndGet();
+ }
+
+ public void decrementPendingTX()
+ {
+ pendingTX.decrementAndGet();
+ }
public boolean isRemoved(final PagePosition pos)
{
@@ -967,6 +995,7 @@
for (PagePosition confirmed : positions)
{
cursor.processACK(confirmed);
+ cursor.deliveredCount.decrementAndGet();
}
}
@@ -1201,10 +1230,8 @@
*/
public void remove()
{
- if (!isredelivery)
- {
- PageSubscriptionImpl.this.getPageInfo(position).remove(position);
- }
+ deliveredCount.incrementAndGet();
+ PageSubscriptionImpl.this.getPageInfo(position).remove(position);
}
/* (non-Javadoc)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2011-02-01
17:02:29 UTC (rev 10167)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2011-02-02
02:21:13 UTC (rev 10168)
@@ -250,6 +250,7 @@
{
if (committed && useRedelivery)
{
+ cursor.addPendingDelivery(cursorPos);
cursor.redeliver(cursorPos);
return true;
}
@@ -271,6 +272,7 @@
{
lateDeliveries = new LinkedList<Pair<PageSubscription,
PagePosition>>();
}
+ cursor.addPendingDelivery(cursorPos);
lateDeliveries.add(new Pair<PageSubscription, PagePosition>(cursor,
cursorPos));
return true;
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-02-01
17:02:29 UTC (rev 10167)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-02-02
02:21:13 UTC (rev 10168)
@@ -849,8 +849,6 @@
return false;
}
- PagedMessage pagedMessage;
-
if (!message.isDurable())
{
// The address should never be transient when paging (even for non-persistent
messages when paging)
@@ -858,8 +856,9 @@
message.bodyChanged();
}
- pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx),
installPageTransaction(tx, listCtx));
+ PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx,
listCtx), tx == null ? -1 : tx.getID());
+
int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
if (currentPageSize.addAndGet(bytesToWrite) > pageSize &&
currentPage.getNumberOfMessages() > 0)
@@ -868,7 +867,9 @@
openNewPage();
currentPageSize.addAndGet(bytesToWrite);
}
-
+
+ installPageTransaction(tx, listCtx, currentPage.getPageId());
+
currentPage.write(pagedMessage);
if (tx != null)
@@ -920,11 +921,11 @@
return ids;
}
- private long installPageTransaction(final Transaction tx, final RouteContextList
listCtx) throws Exception
+ private PageTransactionInfo installPageTransaction(final Transaction tx, final
RouteContextList listCtx, int pageID) throws Exception
{
if (tx == null)
{
- return -1;
+ return null;
}
else
{
@@ -939,7 +940,7 @@
pgTX.increment(listCtx.getNumberOfQueues());
- return tx.getID();
+ return pgTX;
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-02-01
17:02:29 UTC (rev 10167)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-02-02
02:21:13 UTC (rev 10168)
@@ -668,7 +668,7 @@
if (pageSubscription != null)
{
// messageReferences will have depaged messages which we need to discount
from the counter as they are counted on the pageSubscription as well
- return messageReferences.size() - pagedReferences.get() + getScheduledCount()
+ deliveringCount.get() + pageSubscription.getMessageCount();
+ return messageReferences.size() + getScheduledCount() +
deliveringCount.get() + pageSubscription.getMessageCount();
}
else
{
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-02-01
17:02:29 UTC (rev 10167)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-02-02
02:21:13 UTC (rev 10168)
@@ -80,7 +80,7 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(PagingTest.class);
- private static final int RECEIVE_TIMEOUT = 30000;
+ private static final int RECEIVE_TIMEOUT = 5000;
private static final int PAGE_MAX = 100 * 1024;
@@ -237,6 +237,8 @@
sessionCheck.close();
+ System.out.println(queue.getMessagesAdded());
+
assertEquals(numberOfMessages, queue.getMessageCount());
sf.close();
@@ -359,7 +361,7 @@
final int messageSize = 1024;
- final int numberOfMessages = 30000;
+ final int numberOfMessages = 3000;
final byte[] body = new byte[messageSize];
@@ -563,7 +565,7 @@
final int numberOfIntegers = 256;
- final int numberOfMessages = 10000;
+ final int numberOfMessages = 1000;
try
{
@@ -1032,6 +1034,7 @@
ClientMessage message = sessionNonTX.createMessage(true);
message.getBodyBuffer().writeBytes(body);
message.putIntProperty(new SimpleString("id"), i);
+ message.putStringProperty(new SimpleString("tst"), new
SimpleString("i=" + i));
producerTransacted.send(message);
@@ -1041,6 +1044,7 @@
for (int j = 0; j < 20; j++)
{
ClientMessage msgSend = sessionNonTX.createMessage(true);
+ msgSend.putStringProperty(new SimpleString("tst"), new
SimpleString("i=" + i + ", j=" + j));
msgSend.getBodyBuffer().writeBytes(new byte[10 * 1024]);
producerNonTransacted.send(msgSend);
}
@@ -1403,7 +1407,7 @@
server.start();
- final int numberOfMessages = 10000;
+ final int numberOfMessages = 1000;
final int numberOfBytes = 1024;