[jboss-cvs] JBoss Messaging SVN: r4213 - in trunk: src/main/org/jboss/messaging/core/client/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri May 16 06:24:44 EDT 2008


Author: timfox
Date: 2008-05-16 06:24:44 -0400 (Fri, 16 May 2008)
New Revision: 4213

Modified:
   trunk/build-messaging.xml
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
Log:
Refactored consumer flow control to work with bytes


Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml	2008-05-16 09:47:57 UTC (rev 4212)
+++ trunk/build-messaging.xml	2008-05-16 10:24:44 UTC (rev 4213)
@@ -633,6 +633,7 @@
             <fileset dir="${test.classes.dir}">
                <include name="**/org/jboss/messaging/tests/integration/**/*${test-mask}.class"/>
                <include name="**/org/jboss/messaging/tests/unit/**/*${test-mask}.class"/>
+            	<exclude name="**/org/jboss/messaging/tests/local/**/*${test-mask}.class"/>
             </fileset>
          </batchtest>
       </junit>
@@ -669,14 +670,13 @@
             <fileset dir="${test.jms.classes.dir}">
                <include name="**/messaging/**/${test-mask}.class"/>
                <include name="**/jms/**/${test-mask}.class"/>
-               <!-- FIXME temporarily exclude the ack tests since they hang the test suite at the moment -->
-               <exclude name="**/jms/AcknowledgementTest.class"/>
+            	<include name="**/messaging/util/**/${test-mask}.class"/>
                <!-- We exclude the recovery tests for now, until we get recovery up and running again -->
                <exclude name="**/jms/XARecoveryTest.class"/>
                <exclude name="**/jms/XAResourceRecoveryTest.class"/>
                <exclude name="**/jms/XATest.class"/>
                <exclude name="**/jms/ConnectionConsumerTest.class"/>
-               <include name="**/messaging/util/**/${test-mask}.class"/>
+               
                <exclude name="**/*NativeTest.class"/>
                <exclude name="**/jms/MemLeakTest.class"/>
                <exclude name="**/jms/RemotingConnectionConfigurationTest.class"/>

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java	2008-05-16 09:47:57 UTC (rev 4212)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java	2008-05-16 10:24:44 UTC (rev 4213)
@@ -100,28 +100,26 @@
    
    public ClientConnectionFactoryImpl(final Location location)
    {
-      this.location = location;
-      this.strictTck = false;
-      this.defaultConsumerWindowSize = 1000;      
-      this.defaultConsumerMaxRate = -1;
-      this.defaultProducerWindowSize = 1000;
-      this.defaultProducerMaxRate = -1;
-      this.dispatcher = new PacketDispatcherImpl(null);
-      connectionParams = new ConnectionParamsImpl();
+      this(location, new ConnectionParamsImpl(), false);
    }
 
    public ClientConnectionFactoryImpl(final Location location, final ConnectionParams connectionParams)
    {
-      this.location = location;
+      this(location, connectionParams, false);
+   }
+   
+   protected ClientConnectionFactoryImpl(final Location location, final ConnectionParams connectionParams, final boolean dummy)
+   {
       this.strictTck = false;
-      this.defaultConsumerWindowSize = 1000;
+      this.defaultConsumerWindowSize = 1024 * 1024;      
       this.defaultConsumerMaxRate = -1;
       this.defaultProducerWindowSize = 1000;
       this.defaultProducerMaxRate = -1;
       this.dispatcher = new PacketDispatcherImpl(null);
+      this.location = location;
       this.connectionParams = connectionParams;
    }
-
+   
    public ClientConnection createConnection() throws MessagingException
    {
       return createConnection(null, null);

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-05-16 09:47:57 UTC (rev 4212)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-05-16 10:24:44 UTC (rev 4213)
@@ -68,7 +68,7 @@
    
    private final RemotingConnection remotingConnection;
 
-   private final int tokenBatchSize;
+   private final int clientWindowSize;
    
    private final PriorityLinkedList<ClientMessage> buffer = new PriorityLinkedListImpl<ClientMessage>(10);
    
@@ -104,7 +104,7 @@
                              final long clientTargetID,
                              final ExecutorService sessionExecutor,
                              final RemotingConnection remotingConnection,
-                             final int tokenBatchSize,
+                             final int clientWindowSize,
                              final boolean direct)
    {
       this.targetID = targetID;
@@ -117,7 +117,7 @@
       
       this.remotingConnection = remotingConnection;
       
-      this.tokenBatchSize = tokenBatchSize;
+      this.clientWindowSize = clientWindowSize;
       
       this.direct = direct;
    }
@@ -175,7 +175,7 @@
                
                session.delivered(m.getDeliveryID(), expired);
                
-               flowControl();
+               flowControl(m.encodeSize());
                                  
                if (expired)
                {
@@ -240,7 +240,9 @@
       {
          return;
       }
- 
+      
+      log.info("** max size is " + this.maxSize);
+      
       try
       {
          // Now we wait for any current handler runners to run.
@@ -324,7 +326,7 @@
 
             session.delivered(message.getDeliveryID(), expired);
             
-            flowControl();
+            flowControl(message.encodeSize());
 
             if (!expired)
             {
@@ -338,6 +340,8 @@
          	synchronized (this)
          	{
          		buffer.addLast(message, message.getPriority());
+         		
+         		maxSize = Math.max(maxSize, buffer.size());
          	}
          	            	
          	sessionExecutor.execute(new Runnable() { public void run() { callOnMessage(); } } );
@@ -355,6 +359,8 @@
       	}
       }      
    }
+   
+   int maxSize = 0;
 
    public void recover(final long lastDeliveryID)
    {
@@ -375,17 +381,17 @@
    // Private
    // --------------------------------------------------------------------------------------
 
-   private void flowControl() throws MessagingException
+   private void flowControl(final int messageBytes) throws MessagingException
    {
-      if (tokenBatchSize > 0)
+      if (clientWindowSize > 0)
       {
-         tokensToSend++;
+         tokensToSend += messageBytes;
    
-         if (tokensToSend == tokenBatchSize)
-         {
-            tokensToSend = 0;
+         if (tokensToSend >= clientWindowSize)
+         {            
+            remotingConnection.sendOneWay(targetID, session.getServerTargetID(), new ConsumerFlowTokenMessage(tokensToSend));
             
-            remotingConnection.sendOneWay(targetID, session.getServerTargetID(), new ConsumerFlowTokenMessage(tokenBatchSize));                  
+            tokensToSend = 0;            
          }
       }
    }
@@ -456,7 +462,7 @@
    
             session.delivered(message.getDeliveryID(), expired);
             
-            flowControl();
+            flowControl(message.encodeSize());
    
             if (!expired)
             {

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-05-16 09:47:57 UTC (rev 4212)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-05-16 10:24:44 UTC (rev 4213)
@@ -275,10 +275,31 @@
       
       SessionCreateConsumerResponseMessage response = (SessionCreateConsumerResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);
       
-      int tokenBatchSize = response.getWindowSize() == -1 ? 0 : 1;
+      int windowSize = response.getWindowSize();
       
+      int clientWindowSize;
+      if (windowSize == -1)
+      {
+         //No flow control - buffer can increase without bound! Only use with caution for very fast consumers
+         clientWindowSize = 0;
+      }
+      else if (windowSize == 1)
+      {
+         //Slow consumer - no buffering
+         clientWindowSize = 1;
+      }
+      else if (windowSize > 1)
+      {
+         //Client window size is half server window size
+         clientWindowSize = windowSize >> 1;
+      }
+      else
+      {
+         throw new IllegalArgumentException("Invalid window size " + windowSize);
+      }
+      
       ClientConsumerInternal consumer =
-         new ClientConsumerImpl(this, response.getConsumerTargetID(), clientTargetID, executor, remotingConnection, tokenBatchSize, direct);
+         new ClientConsumerImpl(this, response.getConsumerTargetID(), clientTargetID, executor, remotingConnection, clientWindowSize, direct);
 
       consumers.put(response.getConsumerTargetID(), consumer);
       

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2008-05-16 09:47:57 UTC (rev 4212)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2008-05-16 10:24:44 UTC (rev 4213)
@@ -40,6 +40,4 @@
 	void setStarted(boolean started) throws Exception;
 	
 	void receiveTokens(int tokens) throws Exception;
-	
-	void promptDelivery();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-05-16 09:47:57 UTC (rev 4212)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-05-16 10:24:44 UTC (rev 4213)
@@ -224,7 +224,7 @@
 //               if (count == 500000)
 //               {
                   messageReferences.removeFirst();
-            //   }
+              // }
             }
             else
             {

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-05-16 09:47:57 UTC (rev 4212)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-05-16 10:24:44 UTC (rev 4213)
@@ -93,7 +93,7 @@
    private final AtomicInteger availableTokens;
    
    private boolean started;
-
+   
    // Constructors ---------------------------------------------------------------------------------
  
    ServerConsumerImpl(final long id, final long clientTargetID, final Queue messageQueue, final boolean noLocal, final Filter filter,
@@ -162,12 +162,12 @@
    }
    
    public HandleStatus handle(MessageReference ref) throws Exception
-   {
-      if (availableTokens != null && availableTokens.get() == 0)
+   {      
+      if (availableTokens != null && availableTokens.get() <= 0)
       {
          return HandleStatus.BUSY;
       }
-
+      
       if (ref.getMessage().isExpired())
       {         
          ref.expire(persistenceManager, postOffice, queueSettingsRepository);
@@ -209,7 +209,7 @@
                          
          if (availableTokens != null)
          {
-            availableTokens.decrementAndGet();
+            availableTokens.addAndGet(-message.encodeSize());
          }
                    
          try
@@ -274,18 +274,16 @@
    
    public void receiveTokens(final int tokens) throws Exception
    {
-      int previous = availableTokens != null ? availableTokens.getAndAdd(tokens) : 0;
-
-      if (previous == 0)
+      if (availableTokens != null)
       {
-      	promptDelivery();      
-      }   	
-   }
-   
-   public void promptDelivery()
-   {
-      sessionEndpoint.promptDelivery(messageQueue);
-   } 
+         int previous = availableTokens.getAndAdd(tokens);
+         
+         if (previous <= 0)
+         {
+            promptDelivery();
+         }
+      }  	
+   }      
 
    // Public -----------------------------------------------------------------------------
      
@@ -296,4 +294,8 @@
    
    // Private --------------------------------------------------------------------------------------
 
+   private void promptDelivery()
+   {
+      sessionEndpoint.promptDelivery(messageQueue);
+   } 
 }

Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java	2008-05-16 09:47:57 UTC (rev 4212)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java	2008-05-16 10:24:44 UTC (rev 4213)
@@ -126,7 +126,7 @@
          String clientID = null;
          int dupsOKBatchSize = 1000;
          
-         int consumerWindowSize = 1000;
+         int consumerWindowSize = 1024 * 1024;
          int consumerMaxRate = -1;         
          int producerWindowSize = 1000;
          int producerMaxRate = -1;




More information about the jboss-cvs-commits mailing list