[jboss-cvs] JBoss Messaging SVN: r7545 - in trunk: src/main/org/jboss/messaging/core/server/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jul 8 09:48:29 EDT 2009
Author: jmesnil
Date: 2009-07-08 09:48:29 -0400 (Wed, 08 Jul 2009)
New Revision: 7545
Modified:
trunk/src/main/org/jboss/messaging/core/server/Distributor.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributor.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
Log:
JBMESSAGING-1505: Optimise multiple message selectors on a queue
* replaced Distributor.peekConsumer() & incrementPosition() by getNextConsumer() method
* refactored QueueImpl deliver() and directDeliver() to use getNextConsumer()
Modified: trunk/src/main/org/jboss/messaging/core/server/Distributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Distributor.java 2009-07-08 13:41:50 UTC (rev 7544)
+++ trunk/src/main/org/jboss/messaging/core/server/Distributor.java 2009-07-08 13:48:29 UTC (rev 7545)
@@ -38,7 +38,5 @@
int getConsumerCount();
- Consumer peekConsumer();
-
- void incrementPosition();
+ Consumer getNextConsumer();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-07-08 13:41:50 UTC (rev 7544)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-07-08 13:48:29 UTC (rev 7545)
@@ -1288,9 +1288,7 @@
{
return;
}
-
- Consumer firstConsumer = distributionPolicy.peekConsumer();
-
+
Consumer consumer;
MessageReference reference;
@@ -1299,12 +1297,13 @@
int totalConsumers = distributionPolicy.getConsumerCount();
Set<Consumer> busyConsumers = new HashSet<Consumer>();
-
+ Set<Consumer> nullReferences = new HashSet<Consumer>();
+
while (true)
{
- consumer = distributionPolicy.peekConsumer();
+ consumer = distributionPolicy.getNextConsumer();
- iterator = iterators.get(consumer);
+ iterator = iterators.get(consumer);
if (iterator == null)
{
@@ -1331,15 +1330,21 @@
if (reference == null)
{
- if (consumer == firstConsumer)
+ nullReferences.add(consumer);
+ if (nullReferences.size() + busyConsumers.size() == totalConsumers)
{
startDepaging();
// We delivered all the messages - go into direct delivery
direct = true;
promptDelivery = false;
+ return;
}
- return;
+ continue;
}
+ else
+ {
+ nullReferences.remove(consumer);
+ }
initPagingStore(reference.getMessage().getDestination());
@@ -1350,8 +1355,6 @@
Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
if (groupConsumer != null && groupConsumer != consumer)
{
- // this consumer is not in charge of the message group
- distributionPolicy.incrementPosition();
continue;
}
}
@@ -1469,16 +1472,17 @@
return HandleStatus.BUSY;
}
- Consumer firstConsumer = distributionPolicy.peekConsumer();
-
HandleStatus status;
boolean filterRejected = false;
+ int consumerCount = 0;
+
while (true)
{
- Consumer consumer = distributionPolicy.peekConsumer();
-
+ Consumer consumer = distributionPolicy.getNextConsumer();
+ consumerCount++;
+
final SimpleString groupId = (SimpleString)reference.getMessage().getProperty(MessageImpl.HDR_GROUP_ID);
if (groupId != null)
@@ -1486,8 +1490,6 @@
Consumer groupConsumer = groups.putIfAbsent(groupId, consumer);
if (groupConsumer != null && groupConsumer != consumer)
{
- // this consumer is not in charge in the message group
- distributionPolicy.incrementPosition();
continue;
}
}
@@ -1514,7 +1516,7 @@
}
}
// if we've tried all of them
- if (distributionPolicy.peekConsumer() == firstConsumer)
+ if (consumerCount == distributionPolicy.getConsumerCount())
{
if (filterRejected)
{
@@ -1564,10 +1566,6 @@
}
return HandleStatus.BUSY;
}
- finally
- {
- distributionPolicy.incrementPosition();
- }
if (status == null)
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributor.java 2009-07-08 13:41:50 UTC (rev 7544)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributor.java 2009-07-08 13:48:29 UTC (rev 7545)
@@ -57,12 +57,14 @@
return super.getConsumerCount();
}
- public Consumer peekConsumer()
+ public synchronized Consumer getNextConsumer()
{
- return consumers.get(pos);
+ Consumer consumer = consumers.get(pos);
+ incrementPosition();
+ return consumer;
}
- public synchronized void incrementPosition()
+ private synchronized void incrementPosition()
{
pos++;
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2009-07-08 13:41:50 UTC (rev 7544)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2009-07-08 13:48:29 UTC (rev 7545)
@@ -1249,16 +1249,12 @@
return false;
}
- public void incrementPosition()
- {
- }
-
public int getConsumerCount()
{
return 0;
}
- public Consumer peekConsumer()
+ public Consumer getNextConsumer()
{
return consumer;
}
More information about the jboss-cvs-commits
mailing list