[Jboss-cvs] JBoss Messaging SVN: r1213 - in trunk: src/main/org/jboss/messaging/core tests/src/org/jboss/test/messaging/jms/server/destination
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Aug 8 07:47:53 EDT 2006
Author: timfox
Date: 2006-08-08 07:47:48 -0400 (Tue, 08 Aug 2006)
New Revision: 1213
Modified:
trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
trunk/tests/src/org/jboss/test/messaging/jms/server/destination/TopicManagementTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-501 interim
Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-08-08 11:25:08 UTC (rev 1212)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-08-08 11:47:48 UTC (rev 1213)
@@ -643,148 +643,149 @@
while (true)
{
+ //TODO simplify locking - do we really need two locks??
synchronized (refLock)
{
- if (iter == null)
+ synchronized (deliveryLock)
{
- ref = (MessageReference) messageRefs.peekFirst();
- }
- else
- {
- if (iter.hasNext())
- {
- ref = (MessageReference)iter.next();
- }
- else
- {
- ref = null;
- }
- }
- }
-
- if (ref != null)
- {
- // Check if message is expired (we also do this on the client
- // side)
- // If so ack it from the channel
- if (ref.isExpired())
- {
- if (trace) { log.trace("Message reference: " + ref + " has expired"); }
-
- // remove and acknowledge it
if (iter == null)
{
- removeFirstInMemory();
+ ref = (MessageReference) messageRefs.peekFirst();
}
else
{
- iter.remove();
- }
-
- Delivery delivery = new SimpleDelivery(this, ref, true);
-
- acknowledgeInternal(delivery);
- }
- else
- {
- // Reference is not expired
-
- // Attempt to push the ref to a receiver
- Delivery del = push(ref);
-
- if (del == null)
- {
- // no receiver, broken receiver
- // or full receiver
- // so we stop delivering
- if (trace) { log.trace(this + ": no delivery returned for message"
- + ref + " so no receiver got the message");
- log.trace("Delivery is now complete"); }
-
- receiversReady = false;
-
- return;
- }
- else if (!del.isSelectorAccepted())
- {
- // No receiver accepted the message because no selectors matched
- // So we create an iterator (if we haven't already created it) to
- // iterate through the refs in the channel
- // TODO Note that this is only a partial solution since if there are messages paged to storage
- // it won't try those - i.e. it will only iterate through those refs in memory.
- // Dealing with refs in storage is somewhat tricky since we can't just load them and iterate
- // through them since we might run out of memory
- // So we will need to load individual refs from storage given the selector expressions
- // Secondly we should also introduce some in memory indexes here to prevent having to
- // iterate through all the refs every time
- // Having said all that, having consumers on a queue that don't match many messages
- // is an antipattern and should be avoided by the user
- if (iter == null)
+ if (iter.hasNext())
+ {
+ ref = (MessageReference)iter.next();
+ }
+ else
{
- iter = messageRefs.iterator();
- }
+ ref = null;
+ }
}
- else
+
+ if (ref != null)
{
- if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
-
- //Receiver accepted the reference
-
- // We must synchronize here to cope with another race
- // condition where message is
- // cancelled/acked in flight while the following few
- // actions are being performed.
- // e.g. delivery could be cancelled acked after being
- // removed from state but before
- // delivery being added (observed).
- synchronized (del)
+ // Check if message is expired (we also do this on the client
+ // side)
+ // If so ack it from the channel
+ if (ref.isExpired())
{
- if (trace) { log.trace(this + " incrementing delivery count for " + del); }
-
- // FIXME - It's actually possible the delivery could be
- // cancelled before it reaches
- // here, in which case we wouldn't get a delivery but we
- // still need to increment the
- // delivery count
- // All the problems related to these race conditions and
- // fiddly edge cases will disappear
- // once we do
- // http://jira.jboss.com/jira/browse/JBMESSAGING-355
- // This will make life a lot easier
-
- del.getReference().incrementDeliveryCount();
-
- if (!del.isCancelled())
+ if (trace) { log.trace("Message reference: " + ref + " has expired"); }
+
+ // remove and acknowledge it
+ if (iter == null)
{
+ removeFirstInMemory();
+ }
+ else
+ {
+ iter.remove();
+ }
+
+ Delivery delivery = new SimpleDelivery(this, ref, true);
+
+ acknowledgeInternal(delivery);
+ }
+ else
+ {
+ // Reference is not expired
+
+ // Attempt to push the ref to a receiver
+ Delivery del = push(ref);
+
+ if (del == null)
+ {
+ // no receiver, broken receiver
+ // or full receiver
+ // so we stop delivering
+ if (trace) { log.trace(this + ": no delivery returned for message"
+ + ref + " so no receiver got the message");
+ log.trace("Delivery is now complete"); }
+
+ receiversReady = false;
+
+ return;
+ }
+ else if (!del.isSelectorAccepted())
+ {
+ // No receiver accepted the message because no selectors matched
+ // So we create an iterator (if we haven't already created it) to
+ // iterate through the refs in the channel
+ // TODO Note that this is only a partial solution since if there are messages paged to storage
+ // it won't try those - i.e. it will only iterate through those refs in memory.
+ // Dealing with refs in storage is somewhat tricky since we can't just load them and iterate
+ // through them since we might run out of memory
+ // So we will need to load individual refs from storage given the selector expressions
+ // Secondly we should also introduce some in memory indexes here to prevent having to
+ // iterate through all the refs every time
+ // Having said all that, having consumers on a queue that don't match many messages
+ // is an antipattern and should be avoided by the user
if (iter == null)
{
- removeFirstInMemory();
- }
- else
+ iter = messageRefs.iterator();
+ }
+ }
+ else
+ {
+ if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
+
+ //Receiver accepted the reference
+
+ // We must synchronize here to cope with another race
+ // condition where message is
+ // cancelled/acked in flight while the following few
+ // actions are being performed.
+ // e.g. delivery could be cancelled acked after being
+ // removed from state but before
+ // delivery being added (observed).
+ synchronized (del)
{
- iter.remove();
- }
-
- // delivered
- if (!del.isDone())
- {
- // Add the delivery to state
- synchronized (deliveryLock)
+ if (trace) { log.trace(this + " incrementing delivery count for " + del); }
+
+ // FIXME - It's actually possible the delivery could be
+ // cancelled before it reaches
+ // here, in which case we wouldn't get a delivery but we
+ // still need to increment the
+ // delivery count
+ // All the problems related to these race conditions and
+ // fiddly edge cases will disappear
+ // once we do
+ // http://jira.jboss.com/jira/browse/JBMESSAGING-355
+ // This will make life a lot easier
+
+ del.getReference().incrementDeliveryCount();
+
+ if (!del.isCancelled())
{
- deliveries.add(del);
+ if (iter == null)
+ {
+ removeFirstInMemory();
+ }
+ else
+ {
+ iter.remove();
+ }
+
+ // delivered
+ if (!del.isDone())
+ {
+ // Add the delivery to state
+ deliveries.add(del);
+ }
}
}
}
}
}
+ else
+ {
+ // No more refs in channel
+ if (trace) { log.trace(this + " no more refs to deliver "); }
+ break;
+ }
}
}
- else
- {
- // No more refs in channel
- if (trace) { log.trace(this + " no more refs to deliver "); }
- break;
- }
}
}
catch (Throwable t)
Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/destination/TopicManagementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/destination/TopicManagementTest.java 2006-08-08 11:25:08 UTC (rev 1212)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/destination/TopicManagementTest.java 2006-08-08 11:47:48 UTC (rev 1213)
@@ -568,7 +568,7 @@
// Start the connection for delivery
conn.start();
-
+
// Remove all messages from the topic
ServerManagement.invoke(destObjectName, "removeAllMessages", null, null);
More information about the jboss-cvs-commits
mailing list