[jboss-cvs] JBoss Messaging SVN: r2346 - in trunk/src/main/org/jboss/jms: server/endpoint and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Feb 19 07:30:36 EST 2007


Author: timfox
Date: 2007-02-19 07:30:36 -0500 (Mon, 19 Feb 2007)
New Revision: 2346

Modified:
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
Log:
Tweaks to flow control


Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-02-19 11:59:00 UTC (rev 2345)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-02-19 12:30:36 UTC (rev 2346)
@@ -198,8 +198,9 @@
    private boolean listenerRunning;
    private int maxDeliveries;
    private long channelID;
-   private boolean startSendingMessageSent;
    private long lastDeliveryId = -1;
+   //private volatile boolean sentFull;
+   //private volatile boolean sentEmpty;
         
    // Constructors ---------------------------------------------------------------------------------
 
@@ -226,7 +227,6 @@
       mainLock = new Object();
       this.sessionExecutor = sessionExecutor;
       this.maxDeliveries = maxDeliveries;
-      this.startSendingMessageSent = true;
    }
         
    // Public ---------------------------------------------------------------------------------------
@@ -260,18 +260,9 @@
          
          if (trace) { log.trace(this + " added message(s) to the buffer"); }
          
-         messageAdded();
+         messageAdded(); 
          
-         if (buffer.size() >= maxBufferSize)
-         {
-            if (trace) { log.trace(this + " is full"); }
-            
-            //We are full. Send message to server to tell it to stop sending
-            
-            startSendingMessageSent = false;
-            
-            sendChangeRateMessage(0);
-         }
+         checkBufferSize();         
       }
    }
          
@@ -480,22 +471,60 @@
       } 
       
       //This needs to be outside the lock
-      if (!startSendingMessageSent && buffer.size() <= minBufferSize)
-      {
-         //Tell the server we need more messages - but we don't want to keep sending the message
-         //if we've already sent it - hence the check
-         startSendingMessageSent = true;
-            
-         if (trace) { log.trace("telling server to start resume sending messages, buffer size is " + buffer.size()); }
-         
-         sendChangeRateMessage(1);                    
-      }
+      checkBufferSize();
       
       if (trace) { log.trace(this + " receive() returning " + m); }
       
       return m;
-   }    
+   } 
    
+   //We can optimise so it just uses one int to store both flags
+   
+   private volatile boolean sentStop;
+   
+   private volatile boolean sentStart = true;
+   
+   private void checkBufferSize()
+   {
+      int size = buffer.size();
+      
+      if (!sentStart && size <= minBufferSize)
+      {
+         //We need more messages - we need to tell the server this if we haven't done so already
+         
+         sendChangeRateMessage(1.0f);
+         
+         sentStart = true;
+         
+         sentStop = false;
+      }
+      else if (!sentStop && size >= maxBufferSize)
+      {
+         //Our buffer is full - we need to tell the server to stop sending if we haven't
+         //done so already
+         
+         sendChangeRateMessage(0f);
+         
+         sentStop = true;
+         
+         sentStart = false;
+      }
+   }
+   
+   private void sendChangeRateMessage(float newRate) 
+   {
+      try
+      {
+         // this invocation will be sent asynchronously to the server; it's DelegateSupport.invoke()
+         // job to detect it and turn it into a remoting one way invocation.
+         consumerDelegate.changeRate(newRate);
+      }
+      catch (JMSException e)
+      {
+         log.error("Failed to send changeRate message", e);
+      }
+   }
+   
    public MessageListener getMessageListener()
    {
       return listener;      
@@ -584,19 +613,7 @@
       }
    }
    
-   private void sendChangeRateMessage(float newRate) 
-   {
-      try
-      {
-         // this invocation will be sent asynchronously to the server; it's DelegateSupport.invoke()
-         // job to detect it and turn it into a remoting one way invocation.
-         consumerDelegate.changeRate(newRate);
-      }
-      catch (JMSException e)
-      {
-         log.error("Failed to send changeRate message", e);
-      }
-   }
+
    
    private void queueRunner(ListenerRunner runner)
    {
@@ -794,19 +811,9 @@
                log.error("Failed to deliver message", e);
             } 
          }
+                  
+         checkBufferSize();
          
-         
-         // Tell the server we need more messages - but we don't want to keep sending the message
-         // if we've already sent it - hence the check
-         if (!startSendingMessageSent && buffer.size() <= minBufferSize)
-         {                    
-            startSendingMessageSent = true;
-            
-            if (trace) { log.trace("Telling server to start resume sending messages, buffer size is " + buffer.size()); }            
-            
-            sendChangeRateMessage(1);
-         } 
-         
          if (again)
          {
             // Queue it up again

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-02-19 11:59:00 UTC (rev 2345)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-02-19 12:30:36 UTC (rev 2346)
@@ -384,8 +384,13 @@
          
          if (clientAccepting)
          {
+            log.info(this + "Toggle on");
             promptDelivery();
          }            
+         else
+         {
+            log.info(this + "Toggle off");
+         }
       }   
       catch (Throwable t)
       {




More information about the jboss-cvs-commits mailing list