[jboss-cvs] JBoss Messaging SVN: r8536 - in branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918_JBMessaging_1921_JBMessaging_1922/src/main/org/jboss/messaging/core/impl: clusterconnection and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed May 9 06:41:23 EDT 2012


Author: raggz
Date: 2012-05-09 06:41:23 -0400 (Wed, 09 May 2012)
New Revision: 8536

Modified:
   branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918_JBMessaging_1921_JBMessaging_1922/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
   branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918_JBMessaging_1921_JBMessaging_1922/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
Log:
JBMessaging-1922

Modified: branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918_JBMessaging_1921_JBMessaging_1922/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918_JBMessaging_1921_JBMessaging_1922/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2012-05-09 10:39:54 UTC (rev 8535)
+++ branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918_JBMessaging_1921_JBMessaging_1922/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2012-05-09 10:41:23 UTC (rev 8536)
@@ -296,7 +296,7 @@
             if (trace) { log.trace("Set timeout to fire in " + recoverDeliveriesTimeout); }
          }
                            
-         deliverInternal();
+         deliverInternal(theChannelID);
       }
    }  
    
@@ -551,6 +551,22 @@
 		}
 	}
    
+   protected void deliverInternal(long failedChannelID)
+   {
+      super.deliverInternal();
+      
+      if (trace) { log.trace(this + " deliverInternal"); }
+      
+      if (handleFlowControlForConsumers && getReceiversReady() &&
+          localDistributor.getNumberOfReceivers() > 0 && messageRefs.isEmpty())
+      {
+         if (trace) { log.trace("Informing suckers"); }
+         //The receivers are still ready for more messages but there is nothing left in the local queue
+         //so we inform the message suckers to start consuming (if they aren't already)
+         informSuckers(true, failedChannelID);
+      }
+   }
+   
    protected synchronized void setReceiversReady(boolean receiversReady)
    {
    	if (trace) { log.trace(this + " setReceiversReady " + receiversReady); }
@@ -601,6 +617,21 @@
       }
    }
    
+   private void informSuckers(boolean consume, long failedChannelID)
+   {
+      Iterator<MessageSucker> iter = suckers.iterator();
+
+      while (iter.hasNext())
+      {
+         MessageSucker sucker = iter.next();
+         
+         if (sucker.getSourceChannelID() != failedChannelID)
+         {         
+            sucker.setConsuming(consume);
+         }
+      }
+   }
+   
    // Inner classes -------------------------------------------------   
    
    protected class DistributorWrapper implements Distributor

Modified: branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918_JBMessaging_1921_JBMessaging_1922/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918_JBMessaging_1921_JBMessaging_1922/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2012-05-09 10:39:54 UTC (rev 8535)
+++ branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918_JBMessaging_1921_JBMessaging_1922/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2012-05-09 10:41:23 UTC (rev 8536)
@@ -381,4 +381,9 @@
 			log.error("Failed to forward message", e);
 		}
 	}
+
+   public long getSourceChannelID()
+   {
+      return sourceChannelID;
+   }
 }



More information about the jboss-cvs-commits mailing list