[jboss-cvs] JBoss Messaging SVN: r3477 - in branches/Branch_Stable/src/main/org/jboss: jms/server/connectionfactory and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Dec 11 18:40:56 EST 2007


Author: timfox
Date: 2007-12-11 18:40:55 -0500 (Tue, 11 Dec 2007)
New Revision: 3477

Modified:
   branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java
   branches/Branch_Stable/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1199


Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java	2007-12-11 17:56:59 UTC (rev 3476)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java	2007-12-11 23:40:55 UTC (rev 3477)
@@ -205,27 +205,32 @@
    private int ackMode;
    private boolean closed;
    private Object mainLock;
-   private int maxBufferSize;
-   private int minBufferSize;
    private QueuedExecutor sessionExecutor;
    private boolean listenerRunning;
    private int maxDeliveries;
    private String queueName;
    private long lastDeliveryId = -1;
-   private volatile boolean serverSending = true;
+   private int bufferSize;
    private boolean waitingForLastDelivery;
    private boolean shouldAck;
-   private boolean handleFlowControl;
    private long redeliveryDelay;
-   private volatile int currentToken;
-
+   private volatile int currentToken;   
+   private boolean paused;      
+   private int consumeCount;
+   private boolean firstTime = true;
+   
+   public int getBufferSize()
+   {
+      return buffer.size();
+   }
+   
    // Constructors ---------------------------------------------------------------------------------
 
    public ClientConsumer(boolean isCC, int ackMode,                                
                          SessionDelegate sess, ConsumerDelegate cons, String consumerID,
                          String queueName,
                          int bufferSize, QueuedExecutor sessionExecutor,
-                         int maxDeliveries, boolean shouldAck, boolean handleFlowControl,
+                         int maxDeliveries, boolean shouldAck,
                          long redeliveryDelay)
    {
       if (bufferSize < 1)
@@ -233,8 +238,7 @@
          throw new IllegalArgumentException(this + " bufferSize must be > 0");
       }
               
-      this.maxBufferSize = bufferSize;
-      this.minBufferSize = bufferSize / 2;
+      this.bufferSize = bufferSize;
       buffer = new BasicPriorityLinkedList(10);
       isConnectionConsumer = isCC;
       this.ackMode = ackMode;
@@ -246,7 +250,6 @@
       this.sessionExecutor = sessionExecutor;
       this.maxDeliveries = maxDeliveries;
       this.shouldAck = shouldAck;
-      this.handleFlowControl = handleFlowControl;
       this.redeliveryDelay = redeliveryDelay;
    }
         
@@ -515,14 +518,7 @@
             receiverThread = null;            
          }
       } 
-      
-      //This needs to be outside the lock
-
-      if (handleFlowControl)
-      {
-      	checkStart();
-      }
-      
+           
       if (trace) { log.trace(this + " receive() returning " + m); }
       
       return m;
@@ -554,6 +550,8 @@
       {
          buffer.addFirst(proxy, proxy.getJMSPriority());
          
+         consumeCount--;
+         
          messageAdded();
       }
    }
@@ -575,16 +573,59 @@
 
       buffer.clear();
       
-      // need to reset toggle state
-      serverSending = true;
+      consumeCount = 0;
    }
    
    public long getRedeliveryDelay()
    {
    	return redeliveryDelay;
    }
+
+   public void pause()
+   {
+      synchronized (mainLock)
+      {
+         paused = true;
+         
+         sendChangeRateMessage(0f);         
+      }
+   }
    
+   public void resume()
+   {
+      synchronized (mainLock)
+      {
+         paused = false;
+         
+         if (firstTime)
+         {
+            consumeCount = 0;
+            
+            firstTime = false;
+         }
+         else
+         {
+            consumeCount = bufferSize / 3 - buffer.size();
+         }
+                          
+         sendChangeRateMessage(1f);
+      }
+   }
    
+   /*
+    * When I unpause I have x messages in buffer, I will then receive another buffersize messages
+    * 
+    * that will give x + buffersize messages
+    * 
+    * if x = buffersize / 3 then consumeCount = 0
+    * 
+    * consumerCount = x - buffer size / 3
+    * 
+    * 
+    * 
+    * 
+    */
+      
    // Package protected ----------------------------------------------------------------------------
    
    // Protected ------------------------------------------------------------------------------------
@@ -639,39 +680,6 @@
       }
    }
    
-   private void checkStop()
-   {
-      int size = buffer.size();
-      
-      if (serverSending && size >= maxBufferSize)
-      {
-         //Our buffer is full - we need to tell the server to stop sending if we haven't
-         //done so already
-         
-         sendChangeRateMessage(0f);
-         
-         if (trace) { log.trace("Sent changeRate 0 message"); }
-         
-         serverSending = false;
-      }
-   }
-   
-   private void checkStart()
-   {
-      int size = buffer.size();
-      
-      if (!serverSending && size <= minBufferSize)
-      {
-         //We need more messages - we need to tell the server this if we haven't done so already
-         
-         sendChangeRateMessage(1.0f);
-         
-         if (trace) { log.trace("Sent changeRate 1.0 message"); }
-         
-         serverSending = true;
-      }      
-   }
-   
    private void sendChangeRateMessage(float newRate) 
    {
       try
@@ -834,11 +842,25 @@
       if (!closed && !buffer.isEmpty())
       {
          m = (MessageProxy)buffer.removeFirst();
+         
+         checkSendChangeRate();
       }
 
       return m;
    }
    
+   private void checkSendChangeRate()
+   {
+      consumeCount++;
+      
+      if (!paused && consumeCount == bufferSize)
+      {
+         consumeCount = 0;
+         
+         sendChangeRateMessage(1.0f);
+      }
+   }
+   
    // Inner classes --------------------------------------------------------------------------------
          
    /*
@@ -911,11 +933,6 @@
                 if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
 
                 messageAdded();
-
-                if (handleFlowControl)
-                {
-                	checkStop();
-                }
              }
          }
          catch (Exception e)
@@ -951,7 +968,9 @@
             
             // remove a message from the buffer
 
-            mp = (MessageProxy)buffer.removeFirst();                                       
+            mp = (MessageProxy)buffer.removeFirst();                       
+            
+            checkSendChangeRate();
          }
          
          /*
@@ -999,11 +1018,6 @@
             }   
          }
                   
-         if (handleFlowControl)
-         {
-         	checkStart();                                                   
-         }
-         
          if (trace) { log.trace("Exiting run()"); }
       }
    }         

Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2007-12-11 17:56:59 UTC (rev 3476)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2007-12-11 23:40:55 UTC (rev 3477)
@@ -110,7 +110,7 @@
          new ClientConsumer(isCC, sessionState.getAcknowledgeMode(),
                             sessionDelegate, consumerDelegate, consumerID, queueName,
                             prefetchSize, sessionExecutor, maxDeliveries, consumerState.isShouldAck(),
-                            autoFlowControl, redeliveryDelay);
+                            redeliveryDelay);
       
       sessionState.addCallbackHandler(messageHandler);
       

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2007-12-11 17:56:59 UTC (rev 3476)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2007-12-11 23:40:55 UTC (rev 3477)
@@ -190,10 +190,6 @@
          
          if (refCount == 1 && enablePing)
          {
-            // TODO Something is not quite right here, we can detect failure even if pinging is not
-            // enabled, for example if we try to send a callback to the client and sending the
-            // calback fails
-
             // install the connection listener that listens for failed connections            
             server.invoke(connectorObjectName, "addConnectionListener",
                   new Object[] {connectionManager},

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-12-11 17:56:59 UTC (rev 3476)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-12-11 23:40:55 UTC (rev 3477)
@@ -107,17 +107,17 @@
    
    private boolean replicating;
    
-   private boolean slow;
-   
    private volatile boolean dead;
    
+   private int prefetchSize;
+   
    // Constructors ---------------------------------------------------------------------------------
 
    ServerConsumerEndpoint(String id, Queue messageQueue, String queueName,
 					           ServerSessionEndpoint sessionEndpoint, String selector,
 					           boolean noLocal, JBossDestination dest, Queue dlq,
 					           Queue expiryQueue, long redeliveryDelay, int maxDeliveryAttempts,
-					           boolean remote, boolean replicating) throws InvalidSelectorException
+					           boolean remote, boolean replicating, int prefetchSize) throws InvalidSelectorException
    {
       if (trace)
       {
@@ -154,9 +154,17 @@
       this.preserveOrdering = sessionEndpoint.getConnectionEndpoint().getServerPeer().isDefaultPreserveOrdering();
       
       this.replicating = replicating;
+            
+      this.prefetchSize = prefetchSize;
       
-      this.slow = sessionEndpoint.getConnectionEndpoint().getConnectionFactoryEndpoint().isSlowConsumers();
+      boolean slow = sessionEndpoint.getConnectionEndpoint().getConnectionFactoryEndpoint().isSlowConsumers();
       
+      if (slow)
+      {
+         //Slow is same as setting prefetch size to 1 - can deprecate this in 2.0
+         prefetchSize = 1;
+      }      
+      
       if (dest.isTopic() && !messageQueue.isRecoverable())
       {
          // This is a consumer of a non durable topic subscription. We don't need to store
@@ -291,17 +299,24 @@
             }            
          }
                   
-         if (slow)
+                 
+         sendCount++;
+         
+         int num = prefetchSize;
+         
+         if (firstTime)
          {
-         	//If this is a slow consumer, we do not want to do any message buffering, so we immediately
-         	//set clientAccepting to false
-         	//When the client has consumed the message it will send a changeRate + message which will set
-         	//clientAccepting to true again
-         	//We cannot just rely on setting the prefetchSize to 1, since this is not a hard guarantee that only one message
-         	//will be buffered at once due to the asynchronous nature of sending changeRate
-         	this.clientAccepting = false;
+            //We make sure we have a little extra buffer on the client side
+            num = num + num / 3 ;
          }
          
+         if (sendCount == num)
+         {
+            clientAccepting = false;
+            
+            firstTime = false;
+         }         
+         
          try
          {
          	sessionEndpoint.handleDelivery(delivery, this);
@@ -312,11 +327,15 @@
          	
          	this.started = false; // DO NOT return null or the message might get delivered more than once
          }
-                          
+                                           
          return delivery;
       }
    }
    
+   private volatile int sendCount;
+   
+   private boolean firstTime = true;
+   
    // Filter implementation ------------------------------------------------------------------------
 
    public boolean accept(Message msg)
@@ -386,21 +405,9 @@
 
       try
       {
-         // For now we just support a binary on/off.
-         // The client will send newRate = 0, to say it does not want any more messages when its
-         // client side buffer gets full or it will send an arbitrary non zero number to say it
-         // does want more messages, when its client side buffer empties to half its full size.
-         // Note the client does not wait until the client side buffer is empty before sending a
-         // newRate(+ve) message since this would add extra latency.
-
-         // In the future we can fine tune this by allowing the client to specify an actual rate in
-         // the newRate value so this is basically a placeholder for the future so we don't have to
-         // change the wire format when we support it.
-
-         // No need to synchronize - clientAccepting is volatile.
-
          if (newRate > 0)
          {
+            sendCount = 0;
             clientAccepting = true;
          }
          else

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-12-11 17:56:59 UTC (rev 3476)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-12-11 23:40:55 UTC (rev 3477)
@@ -1781,7 +1781,7 @@
       ServerConsumerEndpoint ep =
          new ServerConsumerEndpoint(consumerID, binding.queue,
                                     binding.queue.getName(), this, selectorString, false,
-                                    dest, null, null, 0, -1, true, false);
+                                    dest, null, null, 0, -1, true, false, prefetchSize);
       
       ConsumerAdvised advised;
       
@@ -2085,7 +2085,7 @@
          new ServerConsumerEndpoint(consumerID, queue,
                                     queue.getName(), this, selectorString, noLocal,
                                     jmsDestination, dlqToUse, expiryQueueToUse, redeliveryDelayToUse,
-                                    maxDeliveryAttemptsToUse, false, replicating);
+                                    maxDeliveryAttemptsToUse, false, replicating, prefetchSize);
       
       if (queue.isClustered() && postOffice.isClustered() && jmsDestination.isTopic() && subscriptionName != null)
       {

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2007-12-11 17:56:59 UTC (rev 3476)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2007-12-11 23:40:55 UTC (rev 3477)
@@ -22,17 +22,17 @@
 
 package org.jboss.messaging.core.impl.clusterconnection;
 
-import javax.jms.DeliveryMode;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.Session;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
-import javax.transaction.xa.XAResource;
 
 import org.jboss.jms.client.JBossConnection;
 import org.jboss.jms.client.JBossSession;
-import org.jboss.jms.delegate.ConsumerDelegate;
+import org.jboss.jms.client.container.ClientConsumer;
+import org.jboss.jms.client.delegate.ClientConsumerDelegate;
+import org.jboss.jms.client.state.ConsumerState;
 import org.jboss.jms.delegate.ProducerDelegate;
 import org.jboss.jms.delegate.SessionDelegate;
 import org.jboss.jms.destination.JBossDestination;
@@ -76,8 +76,10 @@
 	
 	private boolean consuming;
 	
-	private ConsumerDelegate consumer;
+	private ClientConsumerDelegate consumer;
 	
+	private ClientConsumer clientConsumer;
+	
 	private boolean preserveOrdering;
 	
 	private long sourceChannelID;
@@ -158,8 +160,10 @@
 		//manually using changeRate() methods
 		//The local queue itself will manually send these messages depending on its state - 
 		//So effectively the message buffering is handled by the local queue, not the ClientConsumer
-		consumer = sourceSession.createConsumerDelegate(dest, null, false, null, false, false);
-				
+		consumer = (ClientConsumerDelegate)sourceSession.createConsumerDelegate(dest, null, false, null, false, false);
+		
+		clientConsumer = ((ConsumerState)consumer.getState()).getClientConsumer();
+								
 		consumer.setMessageListener(this);		
 		
 		//Register ourselves with the local queue - this queue will handle flow control for us
@@ -218,22 +222,18 @@
 		{
 			if (consume && !consuming)
 			{
-				//Send a changeRate(1) message - to start consumption
+				if (trace) { log.trace(this + " resuming client consumer"); }
+			   
+			   clientConsumer.resume();
 				
-				consumer.changeRate(1f);
-				
-				if (trace) { log.trace(this + " sent changeRate(1) message"); }
-				
 				consuming = true;
 			}
 			else if (!consume && consuming)
 			{
-				//Send a changeRate(0) message to stop consumption
+				if (trace) { log.trace(this + " pausing client consumer"); }
+			   
+			   clientConsumer.pause();
 				
-				consumer.changeRate(0f);
-				
-				if (trace) { log.trace(this + " sent changeRate(0) message"); }
-				
 				consuming = false;
 			}
 		}




More information about the jboss-cvs-commits mailing list