[Jboss-cvs] JBoss Messaging SVN: r1215 - trunk/src/main/org/jboss/messaging/core
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Aug 8 09:14:03 EDT 2006
Author: timfox
Date: 2006-08-08 09:14:02 -0400 (Tue, 08 Aug 2006)
New Revision: 1215
Modified:
trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
Log:
Interim race fix
Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-08-08 13:10:56 UTC (rev 1214)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-08-08 13:14:02 UTC (rev 1215)
@@ -228,7 +228,7 @@
return this.handleInternal(sender, r, tx);
}
}
-
+
// DeliveryObserver implementation --------------------------
public void acknowledge(Delivery d, Transaction tx) throws Throwable
@@ -441,7 +441,7 @@
*
*/
public void removeAllReferences() throws Throwable
- {
+ {
synchronized (refLock)
{
synchronized (deliveryLock)
@@ -571,7 +571,7 @@
}
public int messageCount()
- {
+ {
synchronized (refLock)
{
synchronized (deliveryLock)
@@ -643,149 +643,148 @@
while (true)
{
- //TODO simplify locking - do we really need two locks??
synchronized (refLock)
{
- synchronized (deliveryLock)
+ if (iter == null)
{
+ ref = (MessageReference) messageRefs.peekFirst();
+ }
+ else
+ {
+ if (iter.hasNext())
+ {
+ ref = (MessageReference)iter.next();
+ }
+ else
+ {
+ ref = null;
+ }
+ }
+ }
+
+ if (ref != null)
+ {
+ // Check if message is expired (we also do this on the client
+ // side)
+ // If so ack it from the channel
+ if (ref.isExpired())
+ {
+ if (trace) { log.trace("Message reference: " + ref + " has expired"); }
+
+ // remove and acknowledge it
if (iter == null)
{
- ref = (MessageReference) messageRefs.peekFirst();
+ removeFirstInMemory();
}
else
{
- if (iter.hasNext())
- {
- ref = (MessageReference)iter.next();
- }
- else
+ iter.remove();
+ }
+
+ Delivery delivery = new SimpleDelivery(this, ref, true);
+
+ acknowledgeInternal(delivery);
+ }
+ else
+ {
+ // Reference is not expired
+
+ // Attempt to push the ref to a receiver
+ Delivery del = push(ref);
+
+ if (del == null)
+ {
+ // no receiver, broken receiver
+ // or full receiver
+ // so we stop delivering
+ if (trace) { log.trace(this + ": no delivery returned for message"
+ + ref + " so no receiver got the message");
+ log.trace("Delivery is now complete"); }
+
+ receiversReady = false;
+
+ return;
+ }
+ else if (!del.isSelectorAccepted())
+ {
+ // No receiver accepted the message because no selectors matched
+ // So we create an iterator (if we haven't already created it) to
+ // iterate through the refs in the channel
+ // TODO Note that this is only a partial solution since if there are messages paged to storage
+ // it won't try those - i.e. it will only iterate through those refs in memory.
+ // Dealing with refs in storage is somewhat tricky since we can't just load them and iterate
+ // through them since we might run out of memory
+ // So we will need to load individual refs from storage given the selector expressions
+ // Secondly we should also introduce some in memory indexes here to prevent having to
+ // iterate through all the refs every time
+ // Having said all that, having consumers on a queue that don't match many messages
+ // is an antipattern and should be avoided by the user
+ if (iter == null)
{
- ref = null;
- }
+ iter = messageRefs.iterator();
+ }
}
-
- if (ref != null)
+ else
{
- // Check if message is expired (we also do this on the client
- // side)
- // If so ack it from the channel
- if (ref.isExpired())
+ if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
+
+ //Receiver accepted the reference
+
+ // We must synchronize here to cope with another race
+ // condition where message is
+ // cancelled/acked in flight while the following few
+ // actions are being performed.
+ // e.g. delivery could be cancelled acked after being
+ // removed from state but before
+ // delivery being added (observed).
+ synchronized (del)
{
- if (trace) { log.trace("Message reference: " + ref + " has expired"); }
-
- // remove and acknowledge it
- if (iter == null)
+ if (trace) { log.trace(this + " incrementing delivery count for " + del); }
+
+ // FIXME - It's actually possible the delivery could be
+ // cancelled before it reaches
+ // here, in which case we wouldn't get a delivery but we
+ // still need to increment the
+ // delivery count
+ // All the problems related to these race conditions and
+ // fiddly edge cases will disappear
+ // once we do
+ // http://jira.jboss.com/jira/browse/JBMESSAGING-355
+ // This will make life a lot easier
+
+ del.getReference().incrementDeliveryCount();
+
+ if (!del.isCancelled())
{
- removeFirstInMemory();
- }
- else
- {
- iter.remove();
- }
-
- Delivery delivery = new SimpleDelivery(this, ref, true);
-
- acknowledgeInternal(delivery);
- }
- else
- {
- // Reference is not expired
-
- // Attempt to push the ref to a receiver
- Delivery del = push(ref);
-
- if (del == null)
- {
- // no receiver, broken receiver
- // or full receiver
- // so we stop delivering
- if (trace) { log.trace(this + ": no delivery returned for message"
- + ref + " so no receiver got the message");
- log.trace("Delivery is now complete"); }
-
- receiversReady = false;
-
- return;
- }
- else if (!del.isSelectorAccepted())
- {
- // No receiver accepted the message because no selectors matched
- // So we create an iterator (if we haven't already created it) to
- // iterate through the refs in the channel
- // TODO Note that this is only a partial solution since if there are messages paged to storage
- // it won't try those - i.e. it will only iterate through those refs in memory.
- // Dealing with refs in storage is somewhat tricky since we can't just load them and iterate
- // through them since we might run out of memory
- // So we will need to load individual refs from storage given the selector expressions
- // Secondly we should also introduce some in memory indexes here to prevent having to
- // iterate through all the refs every time
- // Having said all that, having consumers on a queue that don't match many messages
- // is an antipattern and should be avoided by the user
if (iter == null)
{
- iter = messageRefs.iterator();
- }
- }
- else
- {
- if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
-
- //Receiver accepted the reference
-
- // We must synchronize here to cope with another race
- // condition where message is
- // cancelled/acked in flight while the following few
- // actions are being performed.
- // e.g. delivery could be cancelled acked after being
- // removed from state but before
- // delivery being added (observed).
- synchronized (del)
+ removeFirstInMemory();
+ }
+ else
{
- if (trace) { log.trace(this + " incrementing delivery count for " + del); }
-
- // FIXME - It's actually possible the delivery could be
- // cancelled before it reaches
- // here, in which case we wouldn't get a delivery but we
- // still need to increment the
- // delivery count
- // All the problems related to these race conditions and
- // fiddly edge cases will disappear
- // once we do
- // http://jira.jboss.com/jira/browse/JBMESSAGING-355
- // This will make life a lot easier
-
- del.getReference().incrementDeliveryCount();
-
- if (!del.isCancelled())
+ iter.remove();
+ }
+
+ // delivered
+ if (!del.isDone())
+ {
+ // Add the delivery to state
+ synchronized (deliveryLock)
{
- if (iter == null)
- {
- removeFirstInMemory();
- }
- else
- {
- iter.remove();
- }
-
- // delivered
- if (!del.isDone())
- {
- // Add the delivery to state
- deliveries.add(del);
- }
+ deliveries.add(del);
}
}
}
}
}
- else
- {
- // No more refs in channel
- if (trace) { log.trace(this + " no more refs to deliver "); }
- break;
- }
}
}
+ else
+ {
+ // No more refs in channel
+ if (trace) { log.trace(this + " no more refs to deliver "); }
+ break;
+ }
}
}
catch (Throwable t)
@@ -1705,7 +1704,7 @@
throw new IllegalStateException(this + " closed");
}
}
-
+
// Inner classes -------------------------------------------------
private class DeliveryRunnable implements Runnable
@@ -1748,6 +1747,5 @@
Delivery d = handleInternal(sender, routable, null);
result.setResult(d);
}
- }
-
+ }
}
More information about the jboss-cvs-commits
mailing list