Author: clebert.suconic(a)jboss.com
Date: 2011-11-18 15:05:39 -0500 (Fri, 18 Nov 2011)
New Revision: 11716
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
Removing Queue synchronization while dealing with paging
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-11-18
20:01:24 UTC (rev 11715)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-11-18
20:05:39 UTC (rev 11716)
@@ -1612,166 +1612,168 @@
// This method will deliver as many messages as possible until all consumers are busy
or there are no more matching
// or available messages
- private synchronized void deliver()
+ private void deliver()
{
- if (paused || consumerList.isEmpty())
+ synchronized (this)
{
- return;
- }
-
- if (log.isDebugEnabled())
- {
- log.debug(this + " doing deliver. messageReferences=" +
messageReferences.size());
- }
+ if (paused || consumerList.isEmpty())
+ {
+ return;
+ }
- int busyCount = 0;
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " doing deliver. messageReferences=" +
messageReferences.size());
+ }
- int nullRefCount = 0;
+ int busyCount = 0;
- int size = consumerList.size();
+ int nullRefCount = 0;
- int endPos = pos == size - 1 ? 0 : size - 1;
+ int size = consumerList.size();
- int numRefs = messageReferences.size();
+ int endPos = pos == size - 1 ? 0 : size - 1;
- int handled = 0;
-
- long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
+ int numRefs = messageReferences.size();
- while (handled < numRefs)
- {
- if (handled == MAX_DELIVERIES_IN_LOOP)
- {
- // Schedule another one - we do this to prevent a single thread getting
caught up in this loop for too long
+ int handled = 0;
- deliverAsync();
+ long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
- return;
- }
-
- if (System.currentTimeMillis() > timeout)
+ while (handled < numRefs)
{
- if (isTrace)
+ if (handled == MAX_DELIVERIES_IN_LOOP)
{
- log.trace("delivery has been running for too long. Scheduling another
delivery task now");
- }
-
- deliverAsync();
-
- return;
- }
-
+ // Schedule another one - we do this to prevent a single thread getting
caught up in this loop for too
+ // long
- ConsumerHolder holder = consumerList.get(pos);
+ deliverAsync();
- Consumer consumer = holder.consumer;
+ return;
+ }
- if (holder.iter == null)
- {
- holder.iter = messageReferences.iterator();
- }
-
- MessageReference ref;
-
- if (holder.iter.hasNext())
- {
- ref = holder.iter.next();
- }
- else
- {
- ref = null;
- }
-
-
- if (ref == null)
- {
- nullRefCount++;
- }
- else
- {
- if (checkExpired(ref))
+ if (System.currentTimeMillis() > timeout)
{
if (isTrace)
{
- log.trace("Reference " + ref + " being expired");
+ log.trace("delivery has been running for too long. Scheduling
another delivery task now");
}
- holder.iter.remove();
- refRemoved(ref);
-
- handled++;
+ deliverAsync();
- continue;
+ return;
}
- Consumer groupConsumer = null;
-
- if (isTrace)
+ ConsumerHolder holder = consumerList.get(pos);
+
+ Consumer consumer = holder.consumer;
+
+ if (holder.iter == null)
{
- log.trace("Queue " + this.getName() + " is delivering
reference " + ref);
+ holder.iter = messageReferences.iterator();
}
- // If a group id is set, then this overrides the consumer chosen round-robin
+ MessageReference ref;
- SimpleString groupID =
ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+ if (holder.iter.hasNext())
+ {
+ ref = holder.iter.next();
+ }
+ else
+ {
+ ref = null;
+ }
- if (groupID != null)
+ if (ref == null)
{
- groupConsumer = groups.get(groupID);
+ nullRefCount++;
+ }
+ else
+ {
+ if (checkExpired(ref))
+ {
+ if (isTrace)
+ {
+ log.trace("Reference " + ref + " being
expired");
+ }
+ holder.iter.remove();
- if (groupConsumer != null)
+ refRemoved(ref);
+
+ handled++;
+
+ continue;
+ }
+
+ Consumer groupConsumer = null;
+
+ if (isTrace)
{
- consumer = groupConsumer;
+ log.trace("Queue " + this.getName() + " is delivering
reference " + ref);
}
- }
- HandleStatus status = handle(ref, consumer);
+ // If a group id is set, then this overrides the consumer chosen
round-robin
- if (status == HandleStatus.HANDLED)
- {
- holder.iter.remove();
+ SimpleString groupID =
ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
- refRemoved(ref);
+ if (groupID != null)
+ {
+ groupConsumer = groups.get(groupID);
- if (groupID != null && groupConsumer == null)
+ if (groupConsumer != null)
+ {
+ consumer = groupConsumer;
+ }
+ }
+
+ HandleStatus status = handle(ref, consumer);
+
+ if (status == HandleStatus.HANDLED)
{
- groups.put(groupID, consumer);
+ holder.iter.remove();
+
+ refRemoved(ref);
+
+ if (groupID != null && groupConsumer == null)
+ {
+ groups.put(groupID, consumer);
+ }
+
+ handled++;
}
+ else if (status == HandleStatus.BUSY)
+ {
+ holder.iter.repeat();
- handled++;
+ busyCount++;
+ }
+ else if (status == HandleStatus.NO_MATCH)
+ {
+ }
}
- else if (status == HandleStatus.BUSY)
- {
- holder.iter.repeat();
- busyCount++;
- }
- else if (status == HandleStatus.NO_MATCH)
+ if (pos == endPos)
{
- }
- }
+ // Round robin'd all
- if (pos == endPos)
- {
- // Round robin'd all
-
- if (nullRefCount + busyCount == size)
- {
- if (log.isDebugEnabled())
+ if (nullRefCount + busyCount == size)
{
- log.debug(this + "::All the consumers were busy, giving up
now");
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::All the consumers were busy, giving up
now");
+ }
+ break;
}
- break;
+
+ nullRefCount = busyCount = 0;
}
- nullRefCount = busyCount = 0;
- }
+ pos++;
- pos++;
-
- if (pos == size)
- {
- pos = 0;
+ if (pos == size)
+ {
+ pos = 0;
+ }
}
}
@@ -1817,7 +1819,7 @@
}
}
- private synchronized void depage()
+ private void depage()
{
depagePending = false;