[jboss-cvs] JBoss Messaging SVN: r3541 - in trunk: src/main/org/jboss/jms/server/endpoint and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jan 7 10:59:53 EST 2008


Author: timfox
Date: 2008-01-07 10:59:53 -0500 (Mon, 07 Jan 2008)
New Revision: 3541

Removed:
   trunk/tests/src/org/jboss/test/thirdparty/remoting/
Modified:
   trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
Log:
Merged flow control change from Branch_Stable and removed old remoting transport tests


Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java	2008-01-07 13:30:09 UTC (rev 3540)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java	2008-01-07 15:59:53 UTC (rev 3541)
@@ -201,18 +201,18 @@
    private int ackMode;
    private boolean closed;
    private Object mainLock;
-   private int maxBufferSize;
-   private int minBufferSize;
    private QueuedExecutor sessionExecutor;
    private boolean listenerRunning;
    private int maxDeliveries;
    private String queueName;
    private long lastDeliveryId = -1;
-   private volatile boolean serverSending = true;
    private boolean waitingForLastDelivery;
    private boolean shouldAck;
-   private boolean handleFlowControl;
    private long redeliveryDelay;
+   private boolean paused;      
+   private int consumeCount;
+   private boolean firstTime = true;
+   private int bufferSize;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -228,8 +228,6 @@
          throw new IllegalArgumentException(this + " bufferSize must be > 0");
       }
               
-      this.maxBufferSize = bufferSize;
-      this.minBufferSize = bufferSize / 2;
       buffer = new BasicPriorityLinkedList(10);
       isConnectionConsumer = isCC;
       this.ackMode = ackMode;
@@ -241,8 +239,8 @@
       this.sessionExecutor = sessionExecutor;
       this.maxDeliveries = maxDeliveries;
       this.shouldAck = shouldAck;
-      this.handleFlowControl = handleFlowControl;
       this.redeliveryDelay = redeliveryDelay;
+      this.bufferSize = bufferSize;
    }
         
    // Public ---------------------------------------------------------------------------------------
@@ -281,11 +279,6 @@
          if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
 
          messageAdded();
-
-         if (handleFlowControl)
-         {
-            checkStop();
-         }
       }
    }
 
@@ -526,13 +519,6 @@
          }
       } 
       
-      //This needs to be outside the lock
-
-      if (handleFlowControl)
-      {
-      	checkStart();
-      }
-      
       if (trace) { log.trace(this + " receive() returning " + m); }
       
       return m;
@@ -564,6 +550,8 @@
       {
          buffer.addFirst(proxy, proxy.getJMSPriority());
          
+         consumeCount--;
+         
          messageAdded();
       }
    }
@@ -583,8 +571,7 @@
 
       buffer.clear();
       
-      // need to reset toggle state
-      serverSending = true;
+      consumeCount = 0;
    }
    
    public long getRedeliveryDelay()
@@ -592,6 +579,37 @@
    	return redeliveryDelay;
    }
    
+   public void pause()
+   {
+      synchronized (mainLock)
+      {
+         paused = true;
+
+         sendChangeRateMessage(0f);         
+      }
+   }
+
+   public void resume()
+   {
+      synchronized (mainLock)
+      {
+         paused = false;
+
+         if (firstTime)
+         {
+            consumeCount = 0;
+
+            firstTime = false;
+         }
+         else
+         {
+            consumeCount = bufferSize / 3 - buffer.size();
+         }
+
+         sendChangeRateMessage(1f);
+      }
+   }
+
    
    // Package protected ----------------------------------------------------------------------------
    
@@ -599,6 +617,18 @@
             
    // Private --------------------------------------------------------------------------------------
 
+   private void checkSendChangeRate()
+   {
+      consumeCount++;
+      
+      if (!paused && consumeCount == bufferSize)
+      {
+         consumeCount = 0;
+
+         sendChangeRateMessage(1.0f);
+      }
+   }
+
    /*
     * Wait for the last delivery to arrive
     */
@@ -647,39 +677,6 @@
       }
    }
    
-   private void checkStop()
-   {
-      int size = buffer.size();
-      
-      if (serverSending && size >= maxBufferSize)
-      {
-         //Our buffer is full - we need to tell the server to stop sending if we haven't
-         //done so already
-         
-         sendChangeRateMessage(0f);
-         
-         if (trace) { log.trace("Sent changeRate 0 message"); }
-         
-         serverSending = false;
-      }
-   }
-   
-   private void checkStart()
-   {
-      int size = buffer.size();
-      
-      if (!serverSending && size <= minBufferSize)
-      {
-         //We need more messages - we need to tell the server this if we haven't done so already
-         
-         sendChangeRateMessage(1.0f);
-         
-         if (trace) { log.trace("Sent changeRate 1.0 message"); }
-         
-         serverSending = true;
-      }      
-   }
-   
    private void sendChangeRateMessage(float newRate) 
    {
       try
@@ -842,6 +839,8 @@
       if (!closed && !buffer.isEmpty())
       {
          m = (JBossMessage)buffer.removeFirst();
+         
+         checkSendChangeRate();
       }
 
       return m;
@@ -898,7 +897,9 @@
             
             // remove a message from the buffer
 
-            msg = (JBossMessage)buffer.removeFirst();                                       
+            msg = (JBossMessage)buffer.removeFirst();                
+            
+            checkSendChangeRate();
          }
          
          /*
@@ -946,11 +947,6 @@
             }   
          }
                   
-         if (handleFlowControl)
-         {
-         	checkStart();                                                   
-         }
-         
          if (trace) { log.trace("Exiting run()"); }
       }
    }   

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-01-07 13:30:09 UTC (rev 3540)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-01-07 15:59:53 UTC (rev 3541)
@@ -127,13 +127,19 @@
 
    private ServerPeer sp;
    
+   private int prefetchSize;
+   
+   private volatile int sendCount;
+   
+   private boolean firstTime = true;
+   
    // Constructors ---------------------------------------------------------------------------------
 
    ServerConsumerEndpoint(ServerPeer sp,String id, Queue messageQueue, String queueName,
 					           ServerSessionEndpoint sessionEndpoint, String selector,
 					           boolean noLocal, JBossDestination dest, Queue dlq,
 					           Queue expiryQueue, long redeliveryDelay, int maxDeliveryAttempts,
-					           boolean remote, boolean replicating) throws InvalidSelectorException
+					           boolean remote, boolean replicating, int prefetchSize) throws InvalidSelectorException
    {
       if (trace)
       {
@@ -171,6 +177,16 @@
       
       this.replicating = replicating;
       
+      this.prefetchSize = prefetchSize;
+      
+      boolean slow = sessionEndpoint.getConnectionEndpoint().getConnectionFactoryEndpoint().isSlowConsumers();
+      
+      if (slow)
+      {
+         //Slow is same as setting prefetch size to 1 - can deprecate this in 2.0
+         prefetchSize = 1;
+      }      
+      
       this.slow = sessionEndpoint.getConnectionEndpoint().getConnectionFactoryEndpoint().isSlowConsumers();
       
       if (dest.isTopic() && !messageQueue.isRecoverable())
@@ -307,17 +323,23 @@
             }            
          }
                   
-         if (slow)
+         sendCount++;
+         
+         int num = prefetchSize;
+         
+         if (firstTime)
          {
-         	//If this is a slow consumer, we do not want to do any message buffering, so we immediately
-         	//set clientAccepting to false
-         	//When the client has consumed the message it will send a changeRate + message which will set
-         	//clientAccepting to true again
-         	//We cannot just rely on setting the prefetchSize to 1, since this is not a hard guarantee that only one message
-         	//will be buffered at once due to the asynchronous nature of sending changeRate
-         	this.clientAccepting = false;
+            //We make sure we have a little extra buffer on the client side
+            num = num + num / 3 ;
          }
          
+         if (sendCount == num)
+         {
+            clientAccepting = false;
+            
+            firstTime = false;
+         }          
+                   
          try
          {
          	sessionEndpoint.handleDelivery(delivery, this);
@@ -402,21 +424,10 @@
 
       try
       {
-         // For now we just support a binary on/off.
-         // The client will send newRate = 0, to say it does not want any more messages when its
-         // client side buffer gets full or it will send an arbitrary non zero number to say it
-         // does want more messages, when its client side buffer empties to half its full size.
-         // Note the client does not wait until the client side buffer is empty before sending a
-         // newRate(+ve) message since this would add extra latency.
-
-         // In the future we can fine tune this by allowing the client to specify an actual rate in
-         // the newRate value so this is basically a placeholder for the future so we don't have to
-         // change the wire format when we support it.
-
-         // No need to synchronize - clientAccepting is volatile.
-
          if (newRate > 0)
          {
+            sendCount = 0;
+            
             clientAccepting = true;
          }
          else

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-01-07 13:30:09 UTC (rev 3540)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-01-07 15:59:53 UTC (rev 3541)
@@ -66,8 +66,6 @@
 import org.jboss.messaging.core.contract.Condition;
 import org.jboss.messaging.core.contract.Delivery;
 import org.jboss.messaging.core.contract.DeliveryObserver;
-import org.jboss.messaging.newcore.Message;
-import org.jboss.messaging.newcore.MessageReference;
 import org.jboss.messaging.core.contract.MessageStore;
 import org.jboss.messaging.core.contract.PersistenceManager;
 import org.jboss.messaging.core.contract.PostOffice;
@@ -81,6 +79,8 @@
 import org.jboss.messaging.core.impl.tx.TxCallback;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
+import org.jboss.messaging.newcore.Message;
+import org.jboss.messaging.newcore.MessageReference;
 import org.jboss.messaging.util.ExceptionUtil;
 import org.jboss.messaging.util.GUIDGenerator;
 import org.jboss.messaging.util.MessageQueueNameHelper;
@@ -379,12 +379,6 @@
       {                
       	if (thisSequence != -1)
       	{
-      		//Need to make sure it is in correct order since np messages are sent
-      		//one way so they can arrive out of sequence
-      		
-      		//This is a workaround to allow us to use one way messages for np messages for performance
-      		//reasons
-      		
       		synchronized (waitLock)
       		{	      		      	        
    				connectionEndpoint.sendMessage(message, null, checkForDuplicates); 
@@ -1776,7 +1770,7 @@
       ServerConsumerEndpoint ep =
          new ServerConsumerEndpoint(sp, consumerID, binding.queue,
                                     binding.queue.getName(), this, selectorString, false,
-                                    dest, null, null, 0, -1, true, false);
+                                    dest, null, null, 0, -1, true, false, prefetchSize);
       
       ConsumerAdvised advised;
       
@@ -2078,7 +2072,7 @@
          new ServerConsumerEndpoint(sp, consumerID, queue,
                                     queue.getName(), this, selectorString, noLocal,
                                     jmsDestination, dlqToUse, expiryQueueToUse, redeliveryDelayToUse,
-                                    maxDeliveryAttemptsToUse, false, replicating);
+                                    maxDeliveryAttemptsToUse, false, replicating, prefetchSize);
       
       if (queue.isClustered() && sp.getConfiguration().isClustered() && jmsDestination.isTopic() && subscriptionName != null)
       {




More information about the jboss-cvs-commits mailing list