[jboss-cvs] JBoss Messaging SVN: r1667 - branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Nov 30 19:09:22 EST 2006
Author: clebert.suconic at jboss.com
Date: 2006-11-30 19:09:21 -0500 (Thu, 30 Nov 2006)
New Revision: 1667
Modified:
branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-660 - Fix provided by Tim Fox
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-11-30 18:47:04 UTC (rev 1666)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-12-01 00:09:21 UTC (rev 1667)
@@ -21,6 +21,8 @@
*/
package org.jboss.messaging.core;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -30,7 +32,6 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
-
import org.jboss.logging.Logger;
import org.jboss.messaging.core.memory.MemoryManager;
import org.jboss.messaging.core.plugin.contract.MessageStore;
@@ -42,9 +43,6 @@
import org.jboss.messaging.core.tx.TxCallback;
import org.jboss.messaging.util.Future;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
-
/**
* Channel implementation. It supports atomicity, isolation and recoverability of reliable messages.
* The channel implementation here uses a "SEDA-type" approach, where requests to handle messages,
@@ -52,7 +50,7 @@
* queue and executed serially by a single thread. This prevents lock contention since requests are
* executed serially, resulting in better scalability and higher throughput at the expense of some
* latency.
- *
+ *
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt> $Id: ChannelSupport.java,v 1.65
@@ -203,11 +201,11 @@
public Delivery handle(DeliveryObserver sender, Routable r, Transaction tx)
{
checkClosed();
-
+
Future result = new Future();
if (tx == null)
- {
+ {
try
{
// Instead of executing directly, we add the handle request to the event queue.
@@ -220,7 +218,7 @@
{
log.warn("Thread interrupted", e);
}
-
+
return (Delivery)result.getResult();
}
else
@@ -228,7 +226,7 @@
return this.handleInternal(sender, r, tx);
}
}
-
+
// DeliveryObserver implementation --------------------------
public void acknowledge(Delivery d, Transaction tx) throws Throwable
@@ -252,7 +250,7 @@
// Future result = new Future();
//
// this.executor.execute(new AcknowledgeRunnable(d, result));
- //
+ //
// //For now we wait for result, but this may not be necessary
// result.getResult();
// }
@@ -284,9 +282,9 @@
// try
// {
// Future result = new Future();
- //
+ //
// this.executor.execute(new CancelRunnable(d, result));
- //
+ //
// //For now we wait for result, but this may not be necessary
// result.getResult();
// }
@@ -296,13 +294,13 @@
// }
// Exception e = new Exception();
-//
+//
// log.error("cancelling delivery: " + d, e);
-//
-
-
+//
+
+
// TODO We should also consider executing cancels on the event queue
- cancelInternal(d);
+ cancelInternal(d);
}
// Distributor implementation ------------------------------------
@@ -314,7 +312,7 @@
boolean added = router.add(r);
if (trace) { log.trace("receiver " + r + (added ? "" : " NOT") + " added"); }
-
+
receiversReady = true;
return added;
}
@@ -372,7 +370,7 @@
log.trace(this + " browse"
+ (filter == null ? "" : ", filter = " + filter));
}
-
+
synchronized (deliveryLock)
{
synchronized (refLock)
@@ -380,14 +378,14 @@
//FIXME - This is currently broken since it doesn't take into account
// refs paged into persistent storage
// Also is very inefficient since it makes a copy
-
+
//TODO use the ref queue iterator
List references = delivering(filter);
-
- List undel = undelivered(filter);
+ List undel = undelivered(filter);
+
references.addAll(undel);
-
+
// dereference pass
ArrayList messages = new ArrayList(references.size());
for (Iterator i = references.iterator(); i.hasNext();)
@@ -397,25 +395,25 @@
}
return messages;
}
- }
+ }
}
public void deliver(boolean synchronous)
{
- checkClosed();
-
+ checkClosed();
+
// We put a delivery request on the event queue.
try
{
Future future = null;
-
+
if (synchronous)
{
future = new Future();
}
-
+
this.executor.execute(new DeliveryRunnable(future));
-
+
if (synchronous)
{
// Wait to complete
@@ -435,25 +433,25 @@
router.clear();
router = null;
}
-
- }
-
+
+ }
+
/*
* This method clears the channel.
* Basically it acknowledges any outstanding deliveries and consumes the rest of the messages in the channel.
* We can't just delete the corresponding references directly from the database since
* a) We might be paging
* b) The message might remain in the message store causing a leak
- *
+ *
*/
public void removeAllReferences() throws Throwable
- {
+ {
synchronized (refLock)
{
synchronized (deliveryLock)
{
//Ack the deliveries
-
+
//Clone to avoid ConcurrentModificationException
Set dels = new HashSet(deliveries);
@@ -461,26 +459,26 @@
while (iter.hasNext())
{
SimpleDelivery d = (SimpleDelivery) iter.next();
-
+
d.acknowledge(null);
}
-
+
//Now we consume the rest of the messages
//This may take a while if we have a lot of messages including perhaps millions
//paged in the database - but there's no obvious other way to do it.
//We cannot just delete them directly from the database - because we may end up with messages leaking
//in the message store,
//also we might get race conditions when other channels are updating the same message in the db
-
+
//Note - we don't do this in a tx - because the tx could be too big if we have millions of refs
//paged in storage
-
+
MessageReference ref;
while ((ref = removeFirstInMemory()) != null)
{
SimpleDelivery del = new SimpleDelivery(this, ref, false);
-
- del.acknowledge(null);
+
+ del.acknowledge(null);
}
}
}
@@ -580,7 +578,7 @@
* Returns the count of messages stored AND being delivered.
*/
public int messageCount()
- {
+ {
synchronized (refLock)
{
synchronized (deliveryLock)
@@ -636,8 +634,8 @@
/*
* This methods delivers as many messages as possible to the router until no
* more deliveries are returned. This method should never be called at the
- * same time as handle.
- *
+ * same time as handle.
+ *
* @see org.jboss.messaging.core.Channel#deliver()
*/
protected void deliverInternal()
@@ -647,16 +645,17 @@
// The iterator is used to iterate through the refs in the channel in the case that they
// don't match the selectors of any receivers.
ListIterator iter = null;
-
+
MessageReference ref = null;
- synchronized (refLock)
+ while (true)
{
- while (true)
+ synchronized (refLock)
{
if (iter == null)
{
- ref = (MessageReference)messageRefs.peekFirst();
+ //ref = (MessageReference) messageRefs.peekFirst();
+ ref = removeFirstInMemory();
}
else
{
@@ -669,127 +668,150 @@
ref = null;
}
}
+ }
- if (ref != null)
+ if (ref != null)
+ {
+ // Check if message is expired (we also do this on the client side)
+ // If so ack it from the channel
+ if (ref.isExpired())
{
- if (trace) { log.trace(this + " pushing " + ref); }
+ if (trace) { log.trace("Message reference: " + ref + " has expired"); }
- // Check if message is expired (we also do this on the client side). If so ack it
- // from the channel.
+ // remove and acknowledge it
+ if (iter == null)
+ {
+ //already removed
+ //removeFirstInMemory();
+ }
+ else
+ {
+ iter.remove();
+ }
- if (ref.isExpired())
+ Delivery delivery = new SimpleDelivery(this, ref, true);
+
+ acknowledgeInternal(delivery);
+ }
+ else
+ {
+ // Reference is not expired
+
+ // Attempt to push the ref to a receiver
+ Delivery del = push(ref);
+
+ if (del == null)
{
- if (trace) { log.trace("Message reference: " + ref + " has expired"); }
+ // No receiver, broken receiver or full receiver so we stop delivering; also
+ // we need to decrement the delivery count, as no real delivery has been
+ // actually performed
- // remove and acknowledge it
+ if (trace) { log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete"); }
+
+ receiversReady = false;
+
if (iter == null)
{
- removeFirstInMemory();
+ // add the message back
+ synchronized (refLock)
+ {
+ messageRefs.addFirst(ref, ref.getPriority());
+ }
}
else
{
- iter.remove();
+ //we didn't remove it in the first place
}
- Delivery delivery = new SimpleDelivery(this, ref, true);
-
- acknowledgeInternal(delivery);
+ return;
}
- else
+ else if (!del.isSelectorAccepted())
{
- // Reference is not expired
+ // No receiver accepted the message because no selectors matched, so we create
+ // an iterator (if we haven't already created it) to iterate through the refs
+ // in the channel.
- // Attempt to push the ref to a receiver
- Delivery del = push(ref);
+ // TODO Note that this is only a partial solution since if there are messages
+ // paged to storage it won't try those - i.e. it will only iterate through
+ // those refs in memory. Dealing with refs in storage is somewhat tricky since
+ // we can't just load them and iterate through them since we might run out of
+ // memory, so we will need to load individual refs from storage given the
+ // selector expressions. Secondly we should also introduce some in memory
+ // indexes here to prevent having to iterate through all the refs every time.
+ // Having said all that, having consumers on a queue that don't match many
+ // messages is an antipattern and should be avoided by the user.
- if (del == null)
+ if (iter == null)
{
- // No receiver, broken receiver or full receiver so we stop delivering; also
- // we need to decrement the delivery count, as no real delivery has been
- // actually performed
+ //Add the message back
- if (trace) { log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete"); }
+ synchronized (refLock)
+ {
+ messageRefs.addFirst(ref, ref.getPriority());
- receiversReady = false;
+ iter = messageRefs.iterator();
- return;
- }
- else if (!del.isSelectorAccepted())
- {
- // No receiver accepted the message because no selectors matched, so we create
- // an iterator (if we haven't already created it) to iterate through the refs
- // in the channel.
+ //And skip the next one (that's the one we just added back)
- // TODO Note that this is only a partial solution since if there are messages
- // paged to storage it won't try those - i.e. it will only iterate through
- // those refs in memory. Dealing with refs in storage is somewhat tricky since
- // we can't just load them and iterate through them since we might run out of
- // memory, so we will need to load individual refs from storage given the
- // selector expressions. Secondly we should also introduce some in memory
- // indexes here to prevent having to iterate through all the refs every time.
- // Having said all that, having consumers on a queue that don't match many
- // messages is an antipattern and should be avoided by the user.
- if (iter == null)
- {
- iter = messageRefs.iterator();
+ iter.next();
}
}
- else
+ }
+ else
+ {
+ if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
+
+ // Receiver accepted the reference
+
+ // We must synchronize here to cope with another race condition where message
+ // is cancelled/acked in flight while the following few actions are being
+ // performed. e.g. delivery could be cancelled acked after being removed from
+ // state but before delivery being added (observed).
+ synchronized (del)
{
- if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
+ if (trace) { log.trace(this + " incrementing delivery count for " + del); }
- // Receiver accepted the reference
+ // FIXME - It's actually possible the delivery could be
+ // cancelled before it reaches
+ // here, in which case we wouldn't get a delivery but we
+ // still need to increment the delivery count
+ // All the problems related to these race conditions and
+ // fiddly edge cases will disappear once we do
+ // http://jira.jboss.com/jira/browse/JBMESSAGING-355
+ // This will make life a lot easier
- // We must synchronize here to cope with another race condition where message
- // is cancelled/acked in flight while the following few actions are being
- // performed. e.g. delivery could be cancelled acked after being removed from
- // state but before delivery being added (observed).
- synchronized (del)
+ if (!del.isCancelled())
{
- // FIXME - It's actually possible the delivery could be
- // cancelled before it reaches
- // here, in which case we wouldn't get a delivery but we
- // still need to increment the
- // delivery count
- // All the problems related to these race conditions and
- // fiddly edge cases will disappear
- // once we do
- // http://jira.jboss.com/jira/browse/JBMESSAGING-355
- // This will make life a lot easier
+ if (iter == null)
+ {
+ //do nothing - already removed
+ //removeFirstInMemory();
+ }
+ else
+ {
+ iter.remove();
+ }
- if (!del.isCancelled())
+ // delivered
+ if (!del.isDone())
{
- if (iter == null)
+ // Add the delivery to state
+ synchronized (deliveryLock)
{
- removeFirstInMemory();
+ deliveries.add(del);
}
- else
- {
- iter.remove();
- if (trace) { log.trace(this + " removed current message from iterator"); }
- }
-
- // delivered
- if (!del.isDone())
- {
- synchronized (deliveryLock)
- {
- deliveries.add(del);
- if (trace) { log.trace(this + " starting to track " + del); }
- }
- }
}
}
}
}
}
- else
- {
- // No more refs in channel
- if (trace) { log.trace(this + " no more refs to deliver "); }
- break;
- }
}
+ else
+ {
+ // No more refs in channel
+ if (trace) { log.trace(this + " no more refs to deliver "); }
+ break;
+ }
}
}
catch (Throwable t)
@@ -819,9 +841,9 @@
//pages and is non recoverable a reliable ref will be paged in the database as reliable
//which makes them hard to remove on server restart.
//If we always page them as unreliable then it is easy to remove them.
- ref.setReliable(false);
+ ref.setReliable(false);
}
-
+
if (tx == null)
{
// Don't even attempt synchronous delivery for a reliable message
@@ -839,18 +861,18 @@
return null;
}
- checkMemory();
+ checkMemory();
ref.setOrdering(messageOrdering.increment());
-
+
if (ref.isReliable() && recoverable)
{
// Reliable message in a recoverable state - also add to db
- if (trace) { log.trace(this + " adding " + ref + " to database non-transactionally"); }
+ if (trace) { log.trace(this + "adding " + ref + " to database non-transactionally"); }
- pm.addReference(channelID, ref, null);
+ pm.addReference(channelID, ref, null);
}
-
+
addReferenceInMemory(ref);
// We only do delivery if there are receivers that haven't said they don't want
@@ -863,26 +885,39 @@
}
else
{
+ if (trace) { log.trace(this + "adding " + ref + " to state " + (tx == null ? "non-transactionally" : "in transaction: " + tx)); }
+
checkMemory();
if (ref.isReliable() && !acceptReliableMessages)
{
- // This transaction has no chance to succeed, since a reliable message cannot be
- // safely stored by a non-recoverable state, so doom the transaction.
- if (trace) { log.trace(this + " cannot handle reliable messages, dooming the transaction"); }
+ // this transaction has no chance to succeed, since a reliable
+ // message cannot be
+ // safely stored by a non-recoverable state, so doom the
+ // transaction
+ if (trace)
+ {
+ log.trace(this + " cannot handle reliable messages, dooming the transaction");
+ }
tx.setRollbackOnly();
- }
+ }
else
{
// add to post commit callback
ref.setOrdering(messageOrdering.increment());
this.getCallback(tx).addRef(ref);
- if (trace) { log.trace(this + " added " + ref + " to memory transactional callback, in transaction: " + tx); }
+ if (trace)
+ {
+ log.trace(this + " added transactionally " + ref
+ + " in memory");
+ }
}
if (ref.isReliable() && recoverable)
{
// Reliable message in a recoverable state - also add to db
+ if (trace) { log.trace(this + "adding " + ref + (tx == null ? " to database non-transactionally" : " in transaction: " + tx)); }
+
pm.addReference(channelID, ref, tx);
}
}
@@ -901,23 +936,26 @@
}
protected void acknowledgeInternal(Delivery d) throws Exception
- {
+ {
synchronized (deliveryLock)
{
acknowledgeInMemory(d);
}
-
+
if (recoverable && d.getReference().isReliable())
{
pm.removeReference(channelID, d.getReference(), null);
}
-
- d.getReference().releaseMemoryReference();
+
+ d.getReference().releaseMemoryReference();
}
protected void cancelInternal(Delivery del) throws Exception
{
- if (trace) { log.trace(this + " cancelling " + del + " in memory"); }
+ if (trace)
+ {
+ log.trace(this + " cancelling " + del + " in memory");
+ }
boolean removed;
@@ -953,14 +991,14 @@
refsInStorage++;
}
}
-
- // We may need to update the delivery count in the database
+
+ //We may need to update the delivery count in the database
if (ref.isReliable())
{
pm.updateDeliveryCount(this.channelID, ref);
}
- if (trace) { log.trace(this + " added " + ref + " back into memory, ready for redelivery"); }
+ if (trace) { log.trace(this + " added " + ref + " back into state"); }
}
}
@@ -968,7 +1006,7 @@
{
synchronized (refLock)
{
- MessageReference result = (MessageReference)messageRefs.removeFirst();
+ MessageReference result = (MessageReference) messageRefs.removeFirst();
if (refsInStorage > 0)
{
@@ -984,8 +1022,7 @@
paging = false;
}
- if (trace) { log.trace(this + " removing first message in memory, which is " + result); }
- return (MessageReference)result;
+ return (MessageReference) result;
}
}
@@ -1029,21 +1066,21 @@
// if (mm != null)
// {
// boolean isLow = mm.isMemoryLow();
- //
+ //
// if (isLow)
// {
- //
+ //
// synchronized (refLock)
// {
// if (!paging)
// {
// log.info("Memory is low:" + this);
- //
+ //
// fullSize = messageRefs.size() + 1;
- //
+ //
// //TODO Make this configurable
// pageSize = downCacheSize = Math.max(1, fullSize / 50);
- //
+ //
// log.info("Turned paging on, fullSize=" + fullSize + " dc:" +
// downCacheSize + " ps: " + pageSize);
// }
@@ -1051,7 +1088,7 @@
// {
// //log.info("already paging");
// }
- //
+ //
// }
// }
// }
@@ -1077,7 +1114,11 @@
{
messageRefs.addLast(ref, ref.getPriority());
- if (trace) { log.trace(this + " added " + ref + " in memory"); }
+ if (trace)
+ {
+ log.trace(this + " added " + ref
+ + " non-transactionally in memory");
+ }
if (messageRefs.size() == fullSize)
{
@@ -1232,7 +1273,7 @@
{
log.trace(this + " removed " + d + " from memory:" + removed);
}
-
+
return removed;
}
@@ -1250,7 +1291,7 @@
// We may load less than desired due to "holes" - this is ok
int numberLoaded = refInfos.size();
-
+
if (numberLoaded == 0)
{
throw new IllegalStateException(
@@ -1313,7 +1354,7 @@
// return a reference
// to the pre-existing message
MessageReference ref = ms.reference(m);
-
+
refMap.put(new Long(m.getMessageID()), ref);
}
}
@@ -1348,15 +1389,15 @@
ref.setDeliveryCount(info.getDeliveryCount());
ref.setOrdering(info.getOrdering());
-
+
//We ignore the reliable field from the message - this is because reliable might be true on the message
//but this is a non recoverable state
-
+
//FIXME - Really the message shouldn't have a reliable field at all,
//Reliability is an attribute of the message reference, not the message
-
+
ref.setReliable(info.isReliable());
-
+
messageRefs.addLast(ref, ref.getPriority());
if (recoverable && ref.isReliable())
@@ -1418,8 +1459,8 @@
return callback;
}
-
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
@@ -1573,7 +1614,7 @@
{
MessageReference ref = (MessageReference) iter.next();
- if (trace) { log.trace(this + " adding " + ref + " to memory"); }
+ if (trace) { log.trace(this + ": adding " + ref + " to non-recoverable state"); }
try
{
@@ -1669,7 +1710,7 @@
return d;
}
-
+
private void checkClosed()
{
if (router == null)
@@ -1677,18 +1718,18 @@
throw new IllegalStateException(this + " closed");
}
}
-
+
// Inner classes -------------------------------------------------
private class DeliveryRunnable implements Runnable
{
Future result;
-
+
DeliveryRunnable(Future result)
{
this.result = result;
}
-
+
public void run()
{
receiversReady = true;
@@ -1720,5 +1761,5 @@
Delivery d = handleInternal(sender, routable, null);
result.setResult(d);
}
- }
+ }
}
More information about the jboss-cvs-commits
mailing list