[jboss-cvs] JBoss Messaging SVN: r8527 - in branches/Branch_1_4/src/main/org/jboss/messaging/core/impl: clusterconnection and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Apr 26 22:42:24 EDT 2012


Author: gaohoward
Date: 2012-04-26 22:42:22 -0400 (Thu, 26 Apr 2012)
New Revision: 8527

Modified:
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
Log:
JBMESSAGING-1922



Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2012-04-18 15:24:34 UTC (rev 8526)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2012-04-27 02:42:22 UTC (rev 8527)
@@ -296,7 +296,7 @@
             if (trace) { log.trace("Set timeout to fire in " + recoverDeliveriesTimeout); }
          }
                            
-         deliverInternal();
+         deliverInternal(theChannelID);
       }
    }  
    
@@ -551,6 +551,22 @@
 		}
 	}
    
+   protected void deliverInternal(long failedChannelID)
+   {
+      super.deliverInternal();
+      
+      if (trace) { log.trace(this + " deliverInternal"); }
+      
+      if (handleFlowControlForConsumers && getReceiversReady() &&
+          localDistributor.getNumberOfReceivers() > 0 && messageRefs.isEmpty())
+      {
+         if (trace) { log.trace("Informing suckers"); }
+         //The receivers are still ready for more messages but there is nothing left in the local queue
+         //so we inform the message suckers to start consuming (if they aren't already)
+         informSuckers(true, failedChannelID);
+      }
+   }
+   
    protected synchronized void setReceiversReady(boolean receiversReady)
    {
    	if (trace) { log.trace(this + " setReceiversReady " + receiversReady); }
@@ -601,6 +617,21 @@
       }
    }
    
+   private void informSuckers(boolean consume, long failedChannelID)
+   {
+      Iterator<MessageSucker> iter = suckers.iterator();
+
+      while (iter.hasNext())
+      {
+         MessageSucker sucker = iter.next();
+         
+         if (sucker.getSourceChannelID() != failedChannelID)
+         {         
+            sucker.setConsuming(consume);
+         }
+      }
+   }
+   
    // Inner classes -------------------------------------------------   
    
    protected class DistributorWrapper implements Distributor

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2012-04-18 15:24:34 UTC (rev 8526)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2012-04-27 02:42:22 UTC (rev 8527)
@@ -381,4 +381,9 @@
 			log.error("Failed to forward message", e);
 		}
 	}
+
+   public long getSourceChannelID()
+   {
+      return sourceChannelID;
+   }
 }



More information about the jboss-cvs-commits mailing list