[Jboss-cvs] JBoss Messaging SVN: r1356 - branches/Branch_1_0/src/main/org/jboss/messaging/core
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Sep 22 17:57:57 EDT 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-09-22 17:57:56 -0400 (Fri, 22 Sep 2006)
New Revision: 1356
Modified:
branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java
Log:
taking into account that a message can be selector-rejecting when decrementing the delivery count
Modified: branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-09-22 21:57:05 UTC (rev 1355)
+++ branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-09-22 21:57:56 UTC (rev 1356)
@@ -692,36 +692,41 @@
{
// Reference is not expired
- // Attempt to push the ref to a receiver
+ // Attempt to push the ref to a receiver, so increment the delivery count
+ ref.incrementDeliveryCount();
+
Delivery del = push(ref);
if (del == null)
{
- // no receiver, broken receiver
- // or full receiver
- // so we stop delivering
- if (trace) { log.trace(this + ": no delivery returned for message"
- + ref + " so no receiver got the message");
- log.trace("Delivery is now complete"); }
+ // No receiver, broken receiver or full receiver so we stop delivering; also
+ // we need to decrement the delivery count, as no real delivery has been
+ // actually performed
- receiversReady = false;
+ if (trace) { log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete"); }
+ ref.decrementDeliveryCount();
+ receiversReady = false;
return;
}
else if (!del.isSelectorAccepted())
{
- // No receiver accepted the message because no selectors matched
- // So we create an iterator (if we haven't already created it) to
- // iterate through the refs in the channel
- // TODO Note that this is only a partial solution since if there are messages paged to storage
- // it won't try those - i.e. it will only iterate through those refs in memory.
- // Dealing with refs in storage is somewhat tricky since we can't just load them and iterate
- // through them since we might run out of memory
- // So we will need to load individual refs from storage given the selector expressions
- // Secondly we should also introduce some in memory indexes here to prevent having to
- // iterate through all the refs every time
- // Having said all that, having consumers on a queue that don't match many messages
- // is an antipattern and should be avoided by the user
+ // No receiver accepted the message because no selectors matched, so we create
+ // an iterator (if we haven't already created it) to iterate through the refs
+ // in the channel. No delivery was really performed, so we decrement the
+ // delivery count
+
+ ref.decrementDeliveryCount();
+
+ // TODO Note that this is only a partial solution since if there are messages
+ // paged to storage it won't try those - i.e. it will only iterate through
+ // those refs in memory. Dealing with refs in storage is somewhat tricky since
+ // we can't just load them and iterate through them since we might run out of
+ // memory, so we will need to load individual refs from storage given the
+ // selector expressions. Secondly we should also introduce some in memory
+ // indexes here to prevent having to iterate through all the refs every time.
+ // Having said all that, having consumers on a queue that don't match many
+ // messages is an antipattern and should be avoided by the user.
if (iter == null)
{
iter = messageRefs.iterator();
@@ -731,15 +736,12 @@
{
if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
- //Receiver accepted the reference
+ // Receiver accepted the reference
- // We must synchronize here to cope with another race
- // condition where message is
- // cancelled/acked in flight while the following few
- // actions are being performed.
- // e.g. delivery could be cancelled acked after being
- // removed from state but before
- // delivery being added (observed).
+ // We must synchronize here to cope with another race condition where message
+ // is cancelled/acked in flight while the following few actions are being
+ // performed. e.g. delivery could be cancelled acked after being removed from
+ // state but before delivery being added (observed).
synchronized (del)
{
if (trace) { log.trace(this + " incrementing delivery count for " + del); }
@@ -1660,31 +1662,23 @@
{
Delivery d = null;
- ref.incrementDeliveryCount();
-
Set deliveries = router.handle(this, ref, null);
if (deliveries.isEmpty())
{
- // unsucessful delivery attempt, nobody actually accepted the message, so decrement the
- // delivery count
- ref.decrementDeliveryCount();
return null;
}
// TODO
- // Sanity check - we shouldn't get more then one delivery - the Channel
- // can only cope with
- // one delivery per message reference at any one time. Eventually this
- // will be enforced in
- // the design of the core classes but for now we just throw an Exception
+ // Sanity check - we shouldn't get more then one delivery - the Channel can only cope with
+ // one delivery per message reference at any one time. Eventually this will be enforced in
+ // the design of the core classes but for now we just throw an Exception.
if (deliveries.size() > 1)
{
- throw new IllegalStateException(
- "More than one delivery returned from router!");
+ throw new IllegalStateException("More than one delivery returned from router!");
}
- d = (Delivery) deliveries.iterator().next();
+ d = (Delivery)deliveries.iterator().next();
return d;
}
More information about the jboss-cvs-commits
mailing list