[hornetq-commits] JBoss hornetq SVN: r10144 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Jan 25 13:00:45 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-01-25 13:00:45 -0500 (Tue, 25 Jan 2011)
New Revision: 10144
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
Log:
HORNETQ-628 - fix on paging counters
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-01-25 09:07:51 UTC (rev 10143)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-01-25 18:00:45 UTC (rev 10144)
@@ -100,6 +100,9 @@
private final ConcurrentLinkedQueue<MessageReference> concurrentQueue = new ConcurrentLinkedQueue<MessageReference>();
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(QueueImpl.NUM_PRIORITIES);
+
+ // The quantity of pagedReferences on messageREferences priority list
+ private final AtomicInteger pagedReferences = new AtomicInteger(0);
private final List<ConsumerHolder> consumerList = new ArrayList<ConsumerHolder>();
@@ -316,16 +319,17 @@
return;
}
- messageReferences.addHead(ref, ref.getMessage().getPriority());
+ internalAddHead(ref);
directDeliver = false;
}
+
public synchronized void reload(final MessageReference ref)
{
if (!scheduledDeliveryHandler.checkAndSchedule(ref))
{
- messageReferences.addTail(ref, ref.getMessage().getPriority());
+ internalAddTail(ref);
}
directDeliver = false;
@@ -621,6 +625,7 @@
if (ref.getMessage().getMessageID() == id)
{
iterator.remove();
+ refRemoved(ref);
removed = ref;
@@ -662,7 +667,8 @@
{
if (pageSubscription != null)
{
- return messageReferences.size() + getScheduledCount() + deliveringCount.get() + pageSubscription.getMessageCount();
+ // messageReferences will have depaged messages which we need to discount from the counter as they are counted on the pageSubscription as well
+ return messageReferences.size() - pagedReferences.get() + getScheduledCount() + deliveringCount.get() + pageSubscription.getMessageCount();
}
else
{
@@ -778,7 +784,7 @@
{
if (!scheduledDeliveryHandler.checkAndSchedule(reference))
{
- messageReferences.addHead(reference, reference.getMessage().getPriority());
+ internalAddHead(reference);
}
resetAllIterators();
@@ -839,6 +845,7 @@
deliveringCount.incrementAndGet();
acknowledge(tx, ref);
iter.remove();
+ refRemoved(ref);
count++;
}
}
@@ -872,6 +879,7 @@
deliveringCount.incrementAndGet();
acknowledge(tx, ref);
iter.remove();
+ refRemoved(ref);
deleted = true;
break;
}
@@ -894,6 +902,7 @@
deliveringCount.incrementAndGet();
expire(ref);
iter.remove();
+ refRemoved(ref);
return true;
}
}
@@ -915,6 +924,7 @@
deliveringCount.incrementAndGet();
expire(tx, ref);
iter.remove();
+ refRemoved(ref);
count++;
}
}
@@ -936,6 +946,7 @@
deliveringCount.incrementAndGet();
expire(ref);
iter.remove();
+ refRemoved(ref);
}
}
}
@@ -952,6 +963,7 @@
deliveringCount.incrementAndGet();
sendToDeadLetterAddress(ref);
iter.remove();
+ refRemoved(ref);
return true;
}
}
@@ -971,6 +983,7 @@
deliveringCount.incrementAndGet();
sendToDeadLetterAddress(ref);
iter.remove();
+ refRemoved(ref);
count++;
}
}
@@ -992,6 +1005,7 @@
if (ref.getMessage().getMessageID() == messageID)
{
iter.remove();
+ refRemoved(ref);
deliveringCount.incrementAndGet();
try
{
@@ -1097,6 +1111,7 @@
if (ref.getMessage().getMessageID() == messageID)
{
iter.remove();
+ refRemoved(ref);
ref.getMessage().setPriority(newPriority);
addTail(ref, false);
return true;
@@ -1118,6 +1133,7 @@
{
count++;
iter.remove();
+ refRemoved(ref);
ref.getMessage().setPriority(newPriority);
addTail(ref, false);
}
@@ -1186,13 +1202,38 @@
// Private
// ------------------------------------------------------------------------------
+ /**
+ * @param ref
+ */
+ private void internalAddTail(final MessageReference ref)
+ {
+ if (ref.isPaged())
+ {
+ pagedReferences.incrementAndGet();
+ }
+ messageReferences.addTail(ref, ref.getMessage().getPriority());
+ }
+
+ /**
+ * @param ref
+ */
+ private void internalAddHead(final MessageReference ref)
+ {
+ if (ref.isPaged())
+ {
+ pagedReferences.incrementAndGet();
+ }
+ messageReferences.addHead(ref, ref.getMessage().getPriority());
+ }
+
+
private synchronized void doPoll()
{
MessageReference ref = concurrentQueue.poll();
if (ref != null)
{
- messageReferences.addTail(ref, ref.getMessage().getPriority());
+ internalAddTail(ref);
messagesAdded++;
@@ -1264,6 +1305,8 @@
if (checkExpired(ref))
{
holder.iter.remove();
+
+ refRemoved(ref);
continue;
}
@@ -1289,6 +1332,8 @@
if (status == HandleStatus.HANDLED)
{
holder.iter.remove();
+
+ refRemoved(ref);
if (groupID != null && groupConsumer == null)
{
@@ -1333,6 +1378,18 @@
scheduleDepage();
}
}
+
+
+ /**
+ * @param ref
+ */
+ private void refRemoved(MessageReference ref)
+ {
+ if (ref.isPaged())
+ {
+ pagedReferences.decrementAndGet();
+ }
+ }
private void scheduleDepage()
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-01-25 09:07:51 UTC (rev 10143)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-01-25 18:00:45 UTC (rev 10144)
@@ -473,7 +473,7 @@
ClientSession sess = sf.createSession(true, true, 0);
sess.start();
ClientConsumer cons = sess.createConsumer(ADDRESS);
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < 100; i++)
{
ClientMessage msg = cons.receive(5000);
assertNotNull(msg);
@@ -510,14 +510,22 @@
}
session.commit();
+
+ q1.getMessageCount();
t1.start();
t1.join();
assertEquals(0, errors.get());
+ long timeout = System.currentTimeMillis() + 10000;
+ while (numberOfMessages -100 != q1.getMessageCount() && System.currentTimeMillis() < timeout)
+ {
+ Thread.sleep(500);
+
+ }
assertEquals(numberOfMessages, q2.getMessageCount());
- assertEquals(numberOfMessages - 10, q1.getMessageCount());
+ assertEquals(numberOfMessages - 100, q1.getMessageCount());
}
catch (Throwable e)
More information about the hornetq-commits
mailing list