[jboss-cvs] JBoss Messaging SVN: r8527 - in branches/Branch_1_4/src/main/org/jboss/messaging/core/impl: clusterconnection and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Apr 26 22:42:24 EDT 2012
Author: gaohoward
Date: 2012-04-26 22:42:22 -0400 (Thu, 26 Apr 2012)
New Revision: 8527
Modified:
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
Log:
JBMESSAGING-1922
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2012-04-18 15:24:34 UTC (rev 8526)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2012-04-27 02:42:22 UTC (rev 8527)
@@ -296,7 +296,7 @@
if (trace) { log.trace("Set timeout to fire in " + recoverDeliveriesTimeout); }
}
- deliverInternal();
+ deliverInternal(theChannelID);
}
}
@@ -551,6 +551,22 @@
}
}
+ protected void deliverInternal(long failedChannelID)
+ {
+ super.deliverInternal();
+
+ if (trace) { log.trace(this + " deliverInternal"); }
+
+ if (handleFlowControlForConsumers && getReceiversReady() &&
+ localDistributor.getNumberOfReceivers() > 0 && messageRefs.isEmpty())
+ {
+ if (trace) { log.trace("Informing suckers"); }
+ //The receivers are still ready for more messages but there is nothing left in the local queue
+ //so we inform the message suckers to start consuming (if they aren't already)
+ informSuckers(true, failedChannelID);
+ }
+ }
+
protected synchronized void setReceiversReady(boolean receiversReady)
{
if (trace) { log.trace(this + " setReceiversReady " + receiversReady); }
@@ -601,6 +617,21 @@
}
}
+ private void informSuckers(boolean consume, long failedChannelID)
+ {
+ Iterator<MessageSucker> iter = suckers.iterator();
+
+ while (iter.hasNext())
+ {
+ MessageSucker sucker = iter.next();
+
+ if (sucker.getSourceChannelID() != failedChannelID)
+ {
+ sucker.setConsuming(consume);
+ }
+ }
+ }
+
// Inner classes -------------------------------------------------
protected class DistributorWrapper implements Distributor
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2012-04-18 15:24:34 UTC (rev 8526)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2012-04-27 02:42:22 UTC (rev 8527)
@@ -381,4 +381,9 @@
log.error("Failed to forward message", e);
}
}
+
+ public long getSourceChannelID()
+ {
+ return sourceChannelID;
+ }
}
More information about the jboss-cvs-commits
mailing list