[jboss-cvs] JBoss Messaging SVN: r8536 - in branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918_JBMessaging_1921_JBMessaging_1922/src/main/org/jboss/messaging/core/impl: clusterconnection and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed May 9 06:41:23 EDT 2012
Author: raggz
Date: 2012-05-09 06:41:23 -0400 (Wed, 09 May 2012)
New Revision: 8536
Modified:
branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918_JBMessaging_1921_JBMessaging_1922/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918_JBMessaging_1921_JBMessaging_1922/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
Log:
JBMessaging-1922
Modified: branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918_JBMessaging_1921_JBMessaging_1922/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918_JBMessaging_1921_JBMessaging_1922/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2012-05-09 10:39:54 UTC (rev 8535)
+++ branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918_JBMessaging_1921_JBMessaging_1922/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2012-05-09 10:41:23 UTC (rev 8536)
@@ -296,7 +296,7 @@
if (trace) { log.trace("Set timeout to fire in " + recoverDeliveriesTimeout); }
}
- deliverInternal();
+ deliverInternal(theChannelID);
}
}
@@ -551,6 +551,22 @@
}
}
+ protected void deliverInternal(long failedChannelID)
+ {
+ super.deliverInternal();
+
+ if (trace) { log.trace(this + " deliverInternal"); }
+
+ if (handleFlowControlForConsumers && getReceiversReady() &&
+ localDistributor.getNumberOfReceivers() > 0 && messageRefs.isEmpty())
+ {
+ if (trace) { log.trace("Informing suckers"); }
+ //The receivers are still ready for more messages but there is nothing left in the local queue
+ //so we inform the message suckers to start consuming (if they aren't already)
+ informSuckers(true, failedChannelID);
+ }
+ }
+
protected synchronized void setReceiversReady(boolean receiversReady)
{
if (trace) { log.trace(this + " setReceiversReady " + receiversReady); }
@@ -601,6 +617,21 @@
}
}
+ private void informSuckers(boolean consume, long failedChannelID)
+ {
+ Iterator<MessageSucker> iter = suckers.iterator();
+
+ while (iter.hasNext())
+ {
+ MessageSucker sucker = iter.next();
+
+ if (sucker.getSourceChannelID() != failedChannelID)
+ {
+ sucker.setConsuming(consume);
+ }
+ }
+ }
+
// Inner classes -------------------------------------------------
protected class DistributorWrapper implements Distributor
Modified: branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918_JBMessaging_1921_JBMessaging_1922/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918_JBMessaging_1921_JBMessaging_1922/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2012-05-09 10:39:54 UTC (rev 8535)
+++ branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918_JBMessaging_1921_JBMessaging_1922/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2012-05-09 10:41:23 UTC (rev 8536)
@@ -381,4 +381,9 @@
log.error("Failed to forward message", e);
}
}
+
+ public long getSourceChannelID()
+ {
+ return sourceChannelID;
+ }
}
More information about the jboss-cvs-commits
mailing list