[jboss-cvs] JBoss Messaging SVN: r4296 - in trunk: examples/jms/src/org/jboss/jms/example and 19 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat May 24 03:07:10 EDT 2008


Author: timfox
Date: 2008-05-24 03:07:10 -0400 (Sat, 24 May 2008)
New Revision: 4296

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowCreditMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerFlowCreditMessage.java
Removed:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowTokenMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerReceiveTokensMessage.java
Modified:
   trunk/examples/jms/build.xml
   trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
   trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java
   trunk/src/config/jbm-configuration.xml
   trunk/src/config/queues.xml
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/deployers/impl/QueueSettingsDeployer.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/postoffice/FlowController.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowControllerImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/server/Queue.java
   trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
   trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
   trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueSettingsDeployerTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/settings/impl/QueueSettingsTest.java
Log:
Disable MINA write queue blocking, enable producer flow control and various other bits


Modified: trunk/examples/jms/build.xml
===================================================================
--- trunk/examples/jms/build.xml	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/examples/jms/build.xml	2008-05-24 07:07:10 UTC (rev 4296)
@@ -42,11 +42,12 @@
 
    <!--perf props-->
    <property name="message.count" value="200000"/>
-   <property name="delivery.mode" value="NON_PERSISTENT"/>
+   <property name="message.warmup.count" value="10000"/>
+   <property name="delivery.mode" value="PERSISTENT"/>
 	<!-- in seconds -->
    <property name="sample.period" value="1"/>
-   <property name="sess.trans" value="false"/>
-   <property name="sess.trans.size" value="1"/>
+   <property name="sess.trans" value="true"/>
+   <property name="sess.trans.size" value="100"/>
 
    <path id="compile.classpath">
       <fileset dir="${lib.dir}">
@@ -125,6 +126,7 @@
       	<jvmarg value="-XX:+UseFastAccessorMethods"/>
         <arg value="-l"/>
         <arg value="${message.count}"/>
+        <arg value="${message.warmup.count}"/>
         <arg value="${delivery.mode}"/>
       	<arg value="${sample.period}"/>
       	<arg value="${sess.trans}"/>
@@ -152,6 +154,7 @@
          <jvmarg value="-XX:+UseFastAccessorMethods"/>
          <arg value="-s"/>
          <arg value="${message.count}"/>
+         <arg value="${message.warmup.count}"/>
          <arg value="${delivery.mode}"/>
          <arg value="${sample.period}"/>
          <arg value="${sess.trans}"/>

Modified: trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -53,14 +53,16 @@
       PerfExample perfExample = new PerfExample();
 
       int noOfMessages = Integer.parseInt(args[1]);
-      int deliveryMode = args[2].equalsIgnoreCase("persistent")? DeliveryMode.PERSISTENT: DeliveryMode.NON_PERSISTENT;
-      long samplePeriod = Long.parseLong(args[3]);
-      boolean transacted = Boolean.parseBoolean(args[4]);
+      int noOfWarmupMessages = Integer.parseInt(args[2]);
+      int deliveryMode = args[3].equalsIgnoreCase("persistent")? DeliveryMode.PERSISTENT: DeliveryMode.NON_PERSISTENT;
+      long samplePeriod = Long.parseLong(args[4]);
+      boolean transacted = Boolean.parseBoolean(args[5]);
       log.info("Transacted:" + transacted);
-      int transactionBatchSize = Integer.parseInt(args[5]);
+      int transactionBatchSize = Integer.parseInt(args[6]);
 
       PerfParams perfParams = new PerfParams();
       perfParams.setNoOfMessagesToSend(noOfMessages);
+      perfParams.setNoOfWarmupMessages(noOfWarmupMessages);
       perfParams.setDeliveryMode(deliveryMode);
       perfParams.setSamplePeriod(samplePeriod);
       perfParams.setSessionTransacted(transacted);
@@ -93,17 +95,14 @@
       {
          log.info("params = " + perfParams);
          init(perfParams.isSessionTransacted());
-         // use 10% of the messages to warm up the system
-         int warmupMessages = perfParams.getNoOfMessagesToSend() / 10;
-         log.info("warming up by sending " + warmupMessages + " messages");
-         sendMessages(warmupMessages, perfParams.getTransactionBatchSize(), perfParams.getDeliveryMode(), perfParams.isSessionTransacted());         
-         log.info("warmed up");
-         // do not take into account messages received during warmup
+         // use 10% of the messages to warm up the system       
+         log.info("warming up by sending " + perfParams.getNoOfWarmupMessages() + " messages");
+         sendMessages(perfParams.getNoOfWarmupMessages(), perfParams.getTransactionBatchSize(), perfParams.getDeliveryMode(), perfParams.isSessionTransacted());         
+         log.info("warmed up");         
          messageCount.set(0);
-         int remainingMessages = perfParams.getNoOfMessagesToSend() - warmupMessages;
 
          scheduler.scheduleAtFixedRate(command, perfParams.getSamplePeriod(), perfParams.getSamplePeriod(), TimeUnit.SECONDS);
-         sendMessages(remainingMessages, perfParams.getTransactionBatchSize(), perfParams.getDeliveryMode(), perfParams.isSessionTransacted());         
+         sendMessages(perfParams.getNoOfMessagesToSend(), perfParams.getTransactionBatchSize(), perfParams.getDeliveryMode(), perfParams.isSessionTransacted());         
          scheduler.shutdownNow();
          
          log.info("average: " + (command.getAverage() / perfParams.getSamplePeriod()) + " msg/s");
@@ -208,11 +207,11 @@
    class PerfListener implements MessageListener
    {
       private CountDownLatch countDownLatch;
-      PerfParams perfParams;
       
-      boolean started = false;
+      private PerfParams perfParams;
+      
+      private boolean started = false;
 
-
       public PerfListener(CountDownLatch countDownLatch, PerfParams perfParams)
       {
          this.countDownLatch = countDownLatch;

Modified: trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -30,6 +30,7 @@
 public class PerfParams implements Serializable
 {
    int noOfMessagesToSend = 1000;
+   int noOfWarmupMessages;
    long samplePeriod = 1; // in seconds
    int deliveryMode  = DeliveryMode.NON_PERSISTENT;
    boolean isSessionTransacted = false;
@@ -44,7 +45,17 @@
    {
       this.noOfMessagesToSend = noOfMessagesToSend;
    }
+   
+   public int getNoOfWarmupMessages()
+   {
+      return noOfWarmupMessages;
+   }
 
+   public void setNoOfWarmupMessages(int noOfWarmupMessages)
+   {
+      this.noOfWarmupMessages = noOfWarmupMessages;
+   }
+
    public long getSamplePeriod()
    {
       return samplePeriod;

Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/config/jbm-configuration.xml	2008-05-24 07:07:10 UTC (rev 4296)
@@ -46,7 +46,8 @@
       
       <remoting-writequeue-minbytes>0</remoting-writequeue-minbytes>
       
-      <remoting-writequeue-maxbytes>8192</remoting-writequeue-maxbytes>
+      <!-- Effectively disable this since we're using producer and consumer flow control -->
+      <remoting-writequeue-maxbytes>100000000</remoting-writequeue-maxbytes>
       
       <!--  if ssl is enabled, all remoting-ssl-* properties must be set -->
       <remoting-enable-ssl>false</remoting-enable-ssl>
@@ -69,7 +70,7 @@
       
       <create-journal-dir>true</create-journal-dir>
       
-      <journal-type>asyncio</journal-type>
+      <journal-type>nio</journal-type>
       
       <journal-sync>true</journal-sync>
       

Modified: trunk/src/config/queues.xml
===================================================================
--- trunk/src/config/queues.xml	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/config/queues.xml	2008-05-24 07:07:10 UTC (rev 4296)
@@ -104,4 +104,4 @@
       <message-counter-history-day-limit>10</message-counter-history-day-limit>
    </queue-settings>
 
-</deployment>
\ No newline at end of file
+</deployment>

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -118,7 +118,7 @@
       this.strictTck = false;
       this.defaultConsumerWindowSize = 1024 * 1024;      
       this.defaultConsumerMaxRate = -1;
-      this.defaultProducerWindowSize = 1000;
+      this.defaultProducerWindowSize = 1024 * 1024;
       this.defaultProducerMaxRate = -1;
       this.location = location;
       this.defaultSendNonPersistentMessagesBlocking = false;

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -31,7 +31,7 @@
 import org.jboss.messaging.core.list.PriorityLinkedList;
 import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowTokenMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
 
 /**
@@ -84,7 +84,7 @@
       
    private volatile long ignoreDeliveryMark = -1;
    
-   private volatile int tokensToSend;   
+   private volatile int creditsToSend;   
    
    
 
@@ -384,13 +384,13 @@
    {
       if (clientWindowSize > 0)
       {
-         tokensToSend += messageBytes;
+         creditsToSend += messageBytes;
    
-         if (tokensToSend >= clientWindowSize)
+         if (creditsToSend >= clientWindowSize)
          {            
-            remotingConnection.sendOneWay(targetID, session.getServerTargetID(), new ConsumerFlowTokenMessage(tokensToSend));
+            remotingConnection.sendOneWay(targetID, session.getServerTargetID(), new ConsumerFlowCreditMessage(creditsToSend));
             
-            tokensToSend = 0;            
+            creditsToSend = 0;            
          }
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -21,6 +21,8 @@
   */
 package org.jboss.messaging.core.client.impl;
 
+import java.util.concurrent.Semaphore;
+
 import org.jboss.messaging.core.client.AcknowledgementHandler;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.exception.MessagingException;
@@ -63,8 +65,10 @@
    
    //For limit throttling
    
-   private volatile int windowSize;
+   //private AtomicInteger availableCredits = new AtomicInteger(0);
    
+   private Semaphore availableCredits;
+   
    //For rate throttling
      
    private final TokenBucketLimiter rateLimiter;
@@ -72,6 +76,8 @@
    private final boolean sendNonPersistentMessagesSynchronously;
    
    private final boolean sendPersistentMessagesSynchronously;
+   
+   private final boolean creditFlowControl;
      
    // Static ---------------------------------------------------------------------------------------
 
@@ -80,10 +86,11 @@
    public ClientProducerImpl(final ClientSessionInternal session, final long serverTargetID,
                              final long clientTargetID,
    		                    final SimpleString address,
-   		                    final RemotingConnection remotingConnection, final int windowSize,
+   		                    final RemotingConnection remotingConnection,
    		                    final int maxRate,
    		                    final boolean sendNonPersistentMessagesSynchronously,
-   		                    final boolean sendPersistentMessagesSynchronously)
+   		                    final boolean sendPersistentMessagesSynchronously,
+   		                    final int initialCredits)
    {   	
       this.session = session;
       
@@ -95,8 +102,6 @@
       
       this.remotingConnection = remotingConnection;
       
-      this.windowSize = windowSize;
-      
       if (maxRate != -1)
       {
       	this.rateLimiter = new TokenBucketLimiter(maxRate, false);
@@ -108,7 +113,13 @@
       
       this.sendNonPersistentMessagesSynchronously = sendNonPersistentMessagesSynchronously; 
       
-      this.sendPersistentMessagesSynchronously = sendPersistentMessagesSynchronously; 
+      this.sendPersistentMessagesSynchronously = sendPersistentMessagesSynchronously;
+      
+//      this.availableCredits.set(initialCredits);
+      
+      this.availableCredits = new Semaphore(initialCredits);
+      
+      this.creditFlowControl = initialCredits != -1;
    }
    
    // ClientProducer implementation ----------------------------------------------------------------
@@ -142,34 +153,19 @@
       {
          msg.setDestination(this.address);
       }
-      
-   	ProducerSendMessage message = new ProducerSendMessage(msg);
+         	   	
+   	if (rateLimiter != null)
+      {
+         // Rate flow control
+                  
+         rateLimiter.limit();
+      }
    	
-   	//TODO flow control disabled for now
-   	
-//   	//We only flow control with non-anonymous producers
-//   	if (address == null)
-//   	{
-//   		while (windowSize == 0)
-//   		{
-//				synchronized (this)
-//				{
-//					try
-//					{						
-//					   wait();						
-//					}
-//					catch (InterruptedException e)
-//					{   						
-//					}
-//				}		
-//   		}
-//   		
-//   		windowSize--;
-//   	}
-   	
    	boolean sendBlocking = msg.isDurable() && sendPersistentMessagesSynchronously ||
    	                       !msg.isDurable() && sendNonPersistentMessagesSynchronously;
-   		
+   	
+      ProducerSendMessage message = new ProducerSendMessage(msg);
+         		
    	if (sendBlocking)
    	{
    	   remotingConnection.sendBlocking(serverTargetID, session.getServerTargetID(), message);
@@ -177,14 +173,36 @@
    	else
    	{
    	   remotingConnection.sendOneWay(serverTargetID, session.getServerTargetID(), message);
-   	}
-   	 	   	
-   	if (rateLimiter != null)
-   	{
-   	   // Rate flow control
-      	   		
-   		rateLimiter.limit();
-   	}
+   	}   	 
+   	
+      //We only flow control with non-anonymous producers
+      if (address == null && creditFlowControl)
+      {
+         try
+         {
+            this.availableCredits.acquire(message.getClientMessage().encodeSize());
+         }
+         catch (InterruptedException e)
+         {           
+         }
+         
+//       while (availableCredits.get() <= 0)
+//       {
+//          //log.info("**blocked");
+//          synchronized (this)
+//          {
+//             try
+//             {                 
+//                wait();                 
+//             }
+//             catch (InterruptedException e)
+//             {                    
+//             }
+//          }     
+//       }
+//       
+//       availableCredits.addAndGet(-message.getClientMessage().encodeSize());
+      }
    }
             
    public void registerAcknowledgementHandler(final AcknowledgementHandler handler)
@@ -218,11 +236,21 @@
    
    // ClientProducerInternal implementation --------------------------------------------------------
    
-   public synchronized void receiveTokens(int tokens)
+   public void receiveCredits(final int credits)
    {
-   	windowSize += tokens;
-   		
-   	notify();
+     // log.info("received credits " + credits);
+      
+      this.availableCredits.release(credits);
+      
+//   	int prev = availableCredits.getAndAdd(credits);
+//   	
+//   	if (prev <= 0 && prev + credits > 0)
+//   	{   		
+//   	   synchronized (this)
+//   	   {
+//   	      notify();
+//   	   }
+//   	}
    }
    
    // Public ---------------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -17,5 +17,5 @@
  */
 public interface ClientProducerInternal extends ClientProducer
 {
-	void receiveTokens(int tokens) throws Exception;
+	void receiveCredits(int credits) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -5,7 +5,7 @@
 import org.jboss.messaging.core.remoting.PacketHandler;
 import org.jboss.messaging.core.remoting.PacketReturner;
 import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
-import org.jboss.messaging.core.remoting.impl.wireformat.ProducerReceiveTokensMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ProducerFlowCreditMessage;
 
 /**
  * 
@@ -42,9 +42,9 @@
          
          if (type == EmptyPacket.PROD_RECEIVETOKENS)
          {
-            ProducerReceiveTokensMessage message = (ProducerReceiveTokensMessage) packet;
+            ProducerFlowCreditMessage message = (ProducerFlowCreditMessage) packet;
             
-            clientProducer.receiveTokens(message.getTokens());
+            clientProducer.receiveCredits(message.getTokens());
          }
          else
          {

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -39,7 +39,7 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowTokenMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
@@ -311,10 +311,10 @@
 
       remotingConnection.getPacketDispatcher().register(new ClientConsumerPacketHandler(consumer, clientTargetID));
       
-      //Now we send window size tokens to start the consumption
+      //Now we send window size credits to start the consumption
       //We even send it if windowSize == -1, since we need to start the consumer
       
-      remotingConnection.sendOneWay(response.getConsumerTargetID(), serverTargetID, new ConsumerFlowTokenMessage(response.getWindowSize()));
+      remotingConnection.sendOneWay(response.getConsumerTargetID(), serverTargetID, new ConsumerFlowCreditMessage(response.getWindowSize()));
 
       return consumer;
    }
@@ -367,11 +367,12 @@
       	//maxRate and windowSize can be overridden by the server
       	
       	producer = new ClientProducerImpl(this, response.getProducerTargetID(), clientTargetID, address,
-      			                            remotingConnection, response.getWindowSize(),
+      			                            remotingConnection,
       			                            response.getMaxRate(),
-      			                            sendNonPersistentMessagesBlocking, autoCommitSends);  
+      			                            sendNonPersistentMessagesBlocking, autoCommitSends,
+      			                            response.getInitialCredits());  
       	
-      	remotingConnection.getPacketDispatcher().register(new ClientProducerPacketHandler(producer, clientTargetID));
+      	remotingConnection.getPacketDispatcher().register(new ClientProducerPacketHandler(producer, clientTargetID));      	
       }
 
       producers.add(producer);

Modified: trunk/src/main/org/jboss/messaging/core/deployers/impl/QueueSettingsDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/deployers/impl/QueueSettingsDeployer.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/deployers/impl/QueueSettingsDeployer.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -42,7 +42,7 @@
    
    private static final String REDELIVERY_DELAY_NODE_NAME = "redelivery-delay";
    
-   private static final String MAX_SIZE_NODE_NAME = "max-size";
+   private static final String MAX_SIZE_BYTES_NODE_NAME = "max-size-bytes";
    
    private static final String DISTRIBUTION_POLICY_CLASS_NODE_NAME = "distribution-policy-class";
    
@@ -99,9 +99,9 @@
          {
             queueSettings.setRedeliveryDelay(Long.valueOf(child.getTextContent()));
          }
-         else if (MAX_SIZE_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
+         else if (MAX_SIZE_BYTES_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
          {
-            queueSettings.setMaxSize(Integer.valueOf(child.getTextContent()));   
+            queueSettings.setMaxSizeBytes(Integer.valueOf(child.getTextContent()));   
          }
          else if (DISTRIBUTION_POLICY_CLASS_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
          {

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -426,7 +426,6 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
       
-      
       int recordLength = record.encodeSize();
       
       int size = SIZE_ADD_RECORD_TX + recordLength;
@@ -528,7 +527,7 @@
 		{
 			throw new IllegalStateException("Journal must be loaded first");
 		}
-		
+	
       TransactionCallback callback = getTransactionCallback(txID);
       callback.countUp();
       
@@ -585,8 +584,7 @@
 		if (state != STATE_LOADED)
 		{
 			throw new IllegalStateException("Journal must be loaded first");
-		}
-		
+		}		
       
 		TransactionNegPos tx = transactionInfos.remove(txID);
 		
@@ -743,7 +741,7 @@
             int pos = bb.position();
             
             byte recordType = bb.get();
-                        
+                
             switch(recordType)
             {
                case ADD_RECORD:
@@ -753,7 +751,7 @@
                   maxMessageID = Math.max(maxMessageID, id);
                   
                   byte userRecordType = bb.get();
-                  
+                   
                   int size = bb.getInt();                
                   byte[] record = new byte[size];                 
                   bb.get(record);
@@ -780,7 +778,7 @@
                   maxMessageID = Math.max(maxMessageID, id);
                   
                   byte userRecordType = bb.get();
-                 
+                  
                   int size = bb.getInt();                
                   byte[] record = new byte[size];                 
                   bb.get(record);                  

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -58,22 +58,22 @@
    
    // Bindings journal record type
    
-	private static final byte BINDING_RECORD = 1;
+	private static final byte BINDING_RECORD = 21;
 	
-	private static final byte DESTINATION_RECORD = 2;
+	private static final byte DESTINATION_RECORD = 22;
 	   
    // type + expiration + timestamp + priority
    public static final int SIZE_FIELDS = SIZE_INT + SIZE_LONG + SIZE_LONG + SIZE_BYTE; 
    
    // Message journal record types
    
-   public static final byte ADD_MESSAGE = 1;
+   public static final byte ADD_MESSAGE = 31;
    
-   public static final byte ACKNOWLEDGE_REF = 2;
+   public static final byte ACKNOWLEDGE_REF = 32;
    
-   public static final byte UPDATE_DELIVERY_COUNT = 3;
+   public static final byte UPDATE_DELIVERY_COUNT = 33;
    
-   public static final byte SET_SCHEDULED_DELIVERY_TIME = 4;
+   public static final byte SET_SCHEDULED_DELIVERY_TIME = 44;
   	
 	private final AtomicLong messageIDSequence = new AtomicLong(0);
 	
@@ -449,16 +449,12 @@
 	{
 		List<RecordInfo> records = new ArrayList<RecordInfo>();
 		
-		bindingsJournal.load(records, null);
+		long maxID = bindingsJournal.load(records, null);
 
-		long maxID = -1;
-
 		for (RecordInfo record: records)
 		{		  
 			long id = record.id;
 			
-			maxID = Math.max(maxID, id);
-
 			byte[] data = record.data;
 
 			ByteArrayInputStream bais = new ByteArrayInputStream(data);

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/FlowController.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/FlowController.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/FlowController.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -35,7 +35,7 @@
 {
 	void messageAcknowledged() throws Exception;
 	
-	void messageReceived(ServerProducer producer, int windowSize) throws Exception;
+	void requestAndSendCredits(ServerProducer producer, int windowSize) throws Exception;
 	
-	int getInitialTokens(int windowSize, ServerProducer producer);
+	int getInitialCredits(int windowSize, ServerProducer producer) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowControllerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowControllerImpl.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowControllerImpl.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -21,14 +21,11 @@
  */
 package org.jboss.messaging.core.postoffice.impl;
 
-import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.FlowController;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerProducer;
 import org.jboss.messaging.util.SimpleString;
 
@@ -45,7 +42,7 @@
 	
 	private int lastPot;
 		
-	private int tokenPot;
+	private int creditPot;
 	
 	private final PostOffice postOffice;
 	
@@ -58,17 +55,17 @@
 		this.address = address;
 		
 		this.postOffice = postOffice;
-		
-		fillPot();
 	}
 	
-	public synchronized int getInitialTokens(final int windowSize, final ServerProducer producer)
-	{
-		int num = Math.min(windowSize, tokenPot);
+	public synchronized int getInitialCredits(final int windowSize, final ServerProducer producer) throws Exception
+	{	     
+      fillPot();
+      
+		int num = Math.min(windowSize, creditPot);
 		
-		tokenPot -= num;
+		creditPot -= num;
 		
-		if (num == 0)
+		if (num <= 0)
 		{
 			//Register producer as a waiter or will never get any messages
 			
@@ -84,28 +81,30 @@
 	//also don't want to execute the whole method if already waiting
 	public synchronized void messageAcknowledged() throws Exception
 	{		
-		fillPot();
-			
-		while (tokenPot > 0)
-		{
-			ServerProducer producer = waitingList.poll();
-			
-			if (producer == null)
-			{
-				break;
-			}
-			
-			tokenPot--;
-			
-			producer.setWaiting(false);
-			
-			producer.sendCredits();
-		}					
+//	   log.info("acking");
+//	   
+//		fillPot();
+//		
+//		log.info("Filled pot is now " + creditPot);
+//			
+//		while (creditPot > 0)
+//		{
+//			ServerProducer producer = waitingList.poll();
+//			
+//			if (producer == null)
+//			{
+//				break;
+//			}
+//			
+//			producer.setWaiting(false);
+//			
+//			producer.requestAndSendCredits();
+//		}					
 	}
 		
-	public synchronized void messageReceived(final ServerProducer producer, final int windowSize) throws Exception
+	public synchronized void requestAndSendCredits(final ServerProducer producer, final int credits) throws Exception
 	{		
-		if (tokenPot == 0)
+		if (creditPot <= 0)
 		{
 			if (!producer.isWaiting())
 			{
@@ -116,48 +115,62 @@
 		}
 		else
 		{
-			tokenPot--;
+		   int creditsToTake = Math.min(credits, creditPot);
+		   
+			creditPot -= creditsToTake;
 			
-			producer.sendCredits();
+			producer.sendCredits(creditsToTake);
 		}
 	}
 			
 	private void fillPot() throws Exception
 	{
-		List<Binding> bindings = postOffice.getBindingsForAddress(address);
+	 //TODO - for now we don't take max size into account
+	   
+//		List<Binding> bindings = postOffice.getBindingsForAddress(address);
+//		
+//		int minAvailable = Integer.MAX_VALUE;
+//		
+//		for (Binding binding: bindings)
+//		{
+//			Queue queue = binding.getQueue();
+//			
+//			int maxSize = queue.getMaxSizeBytes();
+//			
+//			
+//			//log.info("max size is " + maxSize);
+//			
+//			int available;
+//			
+//			if (maxSize == -1)
+//			{
+//				available = Integer.MAX_VALUE;
+//			}
+//			else
+//			{
+//				available = maxSize - queue.getSizeBytes();
+//				
+//				log.info("Available is " + available);
+//			}
+//			
+//			if (available < 0)
+//			{
+//				available = 0;
+//			}
+//			
+//			minAvailable = Math.min(available, minAvailable);		
+//			
+//			log.info("min available is " + minAvailable);
+//		}
+//						
+//		log.info("lastpot is " + lastPot);
+//		if (minAvailable > lastPot)
+//		{
+//			creditPot += minAvailable - lastPot;
+//			
+//			lastPot = minAvailable;
+//		}
 		
-		int minAvailable = Integer.MAX_VALUE;
-		
-		for (Binding binding: bindings)
-		{
-			Queue queue = binding.getQueue();
-			
-			int maxSize = queue.getMaxSize();
-			
-			int available;
-			
-			if (maxSize == -1)
-			{
-				available = Integer.MAX_VALUE;
-			}
-			else
-			{
-				available = maxSize - queue.getMessageCount();
-			}
-			
-			if (available < 0)
-			{
-				available = 0;
-			}
-			
-			minAvailable = Math.min(available, minAvailable);			
-		}
-						
-		if (minAvailable > lastPot)
-		{
-			tokenPot += minAvailable - lastPot;
-			
-			lastPot = minAvailable;
-		}
+		creditPot = Integer.MAX_VALUE;
 	}
 }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -108,7 +108,7 @@
    	{   	
       	if (!temporary)
       	{
-      		storageManager.addDestination(address);
+      		storageManager.addDestination(address);     
       	}
       	 
          flowControllers.put(address, new FlowControllerImpl(address, this));
@@ -288,7 +288,7 @@
       }     
       
       FlowController flowController = flowControllers.get(binding.getAddress());
-           
+                    
       binding.getQueue().setFlowController(flowController);
    }
    
@@ -339,7 +339,13 @@
       List<SimpleString> dests = new ArrayList<SimpleString>();
       
       storageManager.loadBindings(queueFactory, bindings, dests);
-   	
+                  
+      //Destinations must be added first to ensure flow controllers exist before queues are created
+      for (SimpleString destination: destinations)
+      {
+         addDestination(destination, false);
+      }
+                	
       Map<Long, Queue> queues = new HashMap<Long, Queue>();
       
       for (Binding binding: bindings)
@@ -348,9 +354,7 @@
          
          queues.put(binding.getQueue().getPersistenceID(), binding.getQueue());
       }
-      
-      destinations.addAll(dests);
-      
+                 
       storageManager.loadMessages(this, queues);
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingCodec.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingCodec.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -27,14 +27,14 @@
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowTokenMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
 import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
 import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-import org.jboss.messaging.core.remoting.impl.wireformat.ProducerReceiveTokensMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ProducerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ProducerSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReceiveMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
@@ -401,7 +401,7 @@
             }
             case EmptyPacket.CONS_FLOWTOKEN:
             {
-               packet = new ConsumerFlowTokenMessage();
+               packet = new ConsumerFlowCreditMessage();
                break;
             }
             case EmptyPacket.PROD_SEND:
@@ -411,7 +411,7 @@
             }
             case EmptyPacket.PROD_RECEIVETOKENS:
             {
-               packet = new ProducerReceiveTokensMessage();
+               packet = new ProducerFlowCreditMessage();
                break;
             }
             case EmptyPacket.RECEIVE_MSG:

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -169,7 +169,7 @@
       }
       session = future.getSession();
 
-      ServerPingerImpl pinger = new ServerPingerImpl(keepAliveHandler, (long) 0);
+      //ServerPingerImpl pinger = new ServerPingerImpl(keepAliveHandler, (long) 0);
       /*
       getDispatcher().register(pinger);
       if (connectionParams.getKeepAliveInterval() > 0)
@@ -177,7 +177,7 @@
          scheduledExecutor = new ScheduledThreadPoolExecutor(1);
          scheduledExecutor.scheduleAtFixedRate(pinger, 0, connectionParams.getKeepAliveInterval(), TimeUnit.SECONDS);
       }*/
-      dispatcher.register(pinger);
+      //dispatcher.register(pinger);
       return new MinaSession(session, handler);
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -177,58 +177,58 @@
       }
    }
 
-   @Override
-   public synchronized void messageSent(final IoSession session, final Object message) throws Exception
-   {
-      if (blocked)
-      {
-         long bytes = session.getScheduledWriteBytes();
+//   @Override
+//   public synchronized void messageSent(final IoSession session, final Object message) throws Exception
+//   {
+//      if (blocked)
+//      {
+//         long bytes = session.getScheduledWriteBytes();
+//
+//         if (bytes <= bytesLow)
+//         {
+//            blocked = false;
+//
+//            //Note that we need to notify all since there may be more than one thread waiting on this
+//            //E.g. the response from a blocking acknowledge and a delivery
+//            notifyAll();
+//         }
+//      }
+//   }
+//
+//   public synchronized void checkWrite(final IoSession session) throws Exception
+//   {
+//      while (session.getScheduledWriteBytes() >= bytesHigh)
+//      {
+//         blocked = true;
+//
+//         long start = System.currentTimeMillis();
+//
+//         long toWait = blockTimeout;
+//
+//         do
+//         {
+//            wait(toWait);
+//
+//            if (session.getScheduledWriteBytes() < bytesHigh)
+//            {
+//               break;
+//            }
+//
+//            long now = System.currentTimeMillis();
+//
+//            toWait -= now - start;
+//
+//            start = now;
+//         }
+//         while (toWait > 0);
+//
+//         if (toWait <= 0)
+//         {
+//            throw new IllegalStateException("Timed out waiting for MINA queue to free");
+//         }
+//      }
+//   }
 
-         if (bytes <= bytesLow)
-         {
-            blocked = false;
-
-            //Note that we need to notify all since there may be more than one thread waiting on this
-            //E.g. the response from a blocking acknowledge and a delivery
-            notifyAll();
-         }
-      }
-   }
-
-   public synchronized void checkWrite(final IoSession session) throws Exception
-   {
-      while (session.getScheduledWriteBytes() >= bytesHigh)
-      {
-         blocked = true;
-
-         long start = System.currentTimeMillis();
-
-         long toWait = blockTimeout;
-
-         do
-         {
-            wait(toWait);
-
-            if (session.getScheduledWriteBytes() < bytesHigh)
-            {
-               break;
-            }
-
-            long now = System.currentTimeMillis();
-
-            toWait -= now - start;
-
-            start = now;
-         }
-         while (toWait > 0);
-
-         if (toWait <= 0)
-         {
-            throw new IllegalStateException("Timed out waiting for MINA queue to free");
-         }
-      }
-   }
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -246,14 +246,14 @@
          {
             public void send(Packet p) throws Exception
             {
-               try
-               {
-                  checkWrite(session);
-               }
-               catch (Exception e)
-               {
-                  log.error("Failed to acquire sem", e);
-               }
+//               try
+//               {
+//                  checkWrite(session);
+//               }
+//               catch (Exception e)
+//               {
+//                  log.error("Failed to acquire sem", e);
+//               }
 
                dispatcher.callFilters(p);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -54,14 +54,14 @@
    
    public void write(Packet packet)
    {     
-      try
-      {
-         handler.checkWrite(session);
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to acquire sem", e);
-      }
+//      try
+//      {
+//         handler.checkWrite(session);
+//      }
+//      catch (Exception e)
+//      {
+//         log.error("Failed to acquire sem", e);
+//      }
       
       session.write(packet);
    }

Copied: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowCreditMessage.java (from rev 4286, trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowTokenMessage.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowCreditMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowCreditMessage.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -0,0 +1,70 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.util.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class ConsumerFlowCreditMessage extends EmptyPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private int credits;
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ConsumerFlowCreditMessage(final int credits)
+   {
+      super(CONS_FLOWTOKEN);
+
+      this.credits = credits;
+   }
+   
+   public ConsumerFlowCreditMessage()
+   {
+      super(CONS_FLOWTOKEN);
+   }
+
+   // Public --------------------------------------------------------
+
+   public int getTokens()
+   {
+      return credits;
+   }
+   
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.putInt(credits);
+   }
+   
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      credits = buffer.getInt();
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", credits=" + credits + "]";
+   }
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowTokenMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowTokenMessage.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowTokenMessage.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -1,70 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.util.MessagingBuffer;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- * 
- * @version <tt>$Revision$</tt>
- */
-public class ConsumerFlowTokenMessage extends EmptyPacket
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private int tokens;
-   
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public ConsumerFlowTokenMessage(final int tokens)
-   {
-      super(CONS_FLOWTOKEN);
-
-      this.tokens = tokens;
-   }
-   
-   public ConsumerFlowTokenMessage()
-   {
-      super(CONS_FLOWTOKEN);
-   }
-
-   // Public --------------------------------------------------------
-
-   public int getTokens()
-   {
-      return tokens;
-   }
-   
-   public void encodeBody(final MessagingBuffer buffer)
-   {
-      buffer.putInt(tokens);
-   }
-   
-   public void decodeBody(final MessagingBuffer buffer)
-   {
-      tokens = buffer.getInt();
-   }
-
-   @Override
-   public String toString()
-   {
-      return getParentString() + ", tokens=" + tokens + "]";
-   }
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerFlowCreditMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerFlowCreditMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerFlowCreditMessage.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -0,0 +1,75 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.util.MessagingBuffer;
+
+/**
+ * 
+ * A ProducerFlowCreditMessage
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ProducerFlowCreditMessage extends EmptyPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private int credits;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ProducerFlowCreditMessage(final int credits)
+   {
+      super(PROD_RECEIVETOKENS);
+
+      this.credits = credits;
+   }
+   
+   public ProducerFlowCreditMessage()
+   {
+      super(PROD_RECEIVETOKENS);
+   }
+
+   // Public --------------------------------------------------------
+
+   public int getTokens()
+   {
+      return credits;
+   }
+   
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.putInt(credits);
+   }
+   
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      credits = buffer.getInt();
+   }
+
+   @Override
+   public String toString()
+   {
+      StringBuffer buf = new StringBuffer(getParentString());
+      buf.append(", credits=" + credits);
+      buf.append("]");
+      return buf.toString();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerReceiveTokensMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerReceiveTokensMessage.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerReceiveTokensMessage.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -1,75 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.util.MessagingBuffer;
-
-/**
- * 
- * A ProducerReceiveTokensMessage
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class ProducerReceiveTokensMessage extends EmptyPacket
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private int tokens;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public ProducerReceiveTokensMessage(final int tokens)
-   {
-      super(PROD_RECEIVETOKENS);
-
-      this.tokens = tokens;
-   }
-   
-   public ProducerReceiveTokensMessage()
-   {
-      super(PROD_RECEIVETOKENS);
-   }
-
-   // Public --------------------------------------------------------
-
-   public int getTokens()
-   {
-      return tokens;
-   }
-   
-   public void encodeBody(final MessagingBuffer buffer)
-   {
-      buffer.putInt(tokens);
-   }
-   
-   public void decodeBody(final MessagingBuffer buffer)
-   {
-      tokens = buffer.getInt();
-   }
-
-   @Override
-   public String toString()
-   {
-      StringBuffer buf = new StringBuffer(getParentString());
-      buf.append(", tokens=" + tokens);
-      buf.append("]");
-      return buf.toString();
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -21,7 +21,7 @@
 
    private long producerTargetID;
    
-   private int windowSize;
+   private int initialCredits;
    
    private int maxRate;
 
@@ -29,13 +29,13 @@
 
    // Constructors --------------------------------------------------
 
-   public SessionCreateProducerResponseMessage(final long producerTargetID, final int windowSize, final int maxRate)
+   public SessionCreateProducerResponseMessage(final long producerTargetID, final int initialCredits, final int maxRate)
    {
       super(SESS_CREATEPRODUCER_RESP);
 
       this.producerTargetID = producerTargetID;
       
-      this.windowSize = windowSize;
+      this.initialCredits = initialCredits;
       
       this.maxRate = maxRate;
    }
@@ -52,9 +52,9 @@
       return producerTargetID;
    }
    
-   public int getWindowSize()
+   public int getInitialCredits()
    {
-   	return windowSize;
+   	return initialCredits;
    }
    
    public int getMaxRate()
@@ -65,14 +65,14 @@
    public void encodeBody(final MessagingBuffer buffer)
    {
       buffer.putLong(producerTargetID);
-      buffer.putInt(windowSize);
+      buffer.putInt(initialCredits);
       buffer.putInt(maxRate);
    }
    
    public void decodeBody(final MessagingBuffer buffer)
    {
       producerTargetID = buffer.getLong();      
-      windowSize = buffer.getInt();
+      initialCredits = buffer.getInt();
       maxRate = buffer.getInt();
    }
    
@@ -82,7 +82,7 @@
    {
       StringBuffer buf = new StringBuffer(getParentString());
       buf.append(", producerTargetID=" + producerTargetID);
-      buf.append(", windowSize=" + windowSize);
+      buf.append(", initialCredits=" + initialCredits);
       buf.append(", maxRate=" + maxRate);
       buf.append("]");
       return buf.toString();

Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -78,16 +78,22 @@
    
    int getDeliveringCount();
    
-   void referenceAcknowledged() throws Exception;
+   void referenceAcknowledged(MessageReference ref) throws Exception;
   
    void referenceCancelled();
    
    int getScheduledCount();
           
-   int getMaxSize();
+  // int getMaxSize();
    
-   void setMaxSize(int maxSize);
+  // int getSizeBytes();
    
+ //  void setMaxSize(int maxSize);
+   
+   int getMaxSizeBytes();
+   
+   int getSizeBytes();
+   
    DistributionPolicy getDistributionPolicy();
    
    void setDistributionPolicy(DistributionPolicy policy); 

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -39,5 +39,5 @@
 	
 	void setStarted(boolean started) throws Exception;
 	
-	void receiveTokens(int tokens) throws Exception;
+	void receiveCredits(int credits) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -16,8 +16,10 @@
 	
 	void send(ServerMessage msg) throws Exception;
 	
-	void sendCredits() throws Exception;
+	void requestAndSendCredits() throws Exception;
 	
+	void sendCredits(int credits) throws Exception;
+	
 	void setWaiting(boolean waiting);
 	
 	boolean isWaiting();

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -354,15 +354,15 @@
       remotingService.getDispatcher().register(new ServerConnectionPacketHandler(connection));
 
       CreateConnectionResponse createConnectionResponse = new CreateConnectionResponse(connection.getID(), version);
-      if(cleanUpNotifier != null)
-      {
-         if(!getRemotingService().getKeepAliveFactory().isPinging(sender.getSessionID()))
-         {
-            getRemotingService().getKeepAliveFactory().getSessions().add(sender.getSessionID());
-            ClientPinger clientPinger = new ClientPingerImpl(this, getRemotingService().getKeepAliveFactory(), cleanUpNotifier, sender);
-            new Thread(clientPinger).start();
-         }
-      }
+//      if(cleanUpNotifier != null)
+//      {
+//         if(!getRemotingService().getKeepAliveFactory().isPinging(sender.getSessionID()))
+//         {
+//            getRemotingService().getKeepAliveFactory().getSessions().add(sender.getSessionID());
+//            ClientPinger clientPinger = new ClientPingerImpl(this, getRemotingService().getKeepAliveFactory(), cleanUpNotifier, sender);
+//            new Thread(clientPinger).start();
+//         }
+//      }
 
       return createConnectionResponse;
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -58,7 +58,7 @@
       QueueSettings queueSettings = queueSettingsRepository.getMatch(name.toString());
             
       Queue queue = new QueueImpl(persistenceID, name, filter, queueSettings.isClustered(), durable, temporary,
-      		queueSettings.getMaxSize(), scheduledExecutor);
+      		queueSettings.getMaxSizeBytes(), scheduledExecutor);
 
       queue.setDistributionPolicy(queueSettings.getDistributionPolicy());
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -77,8 +77,8 @@
 
    private final boolean temporary;
 
-   private volatile int maxSize;
-
+   private final int maxSizeBytes;
+     
    private final ScheduledExecutorService scheduledExecutor;
 
    private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(
@@ -95,6 +95,8 @@
    private boolean promptDelivery;
 
    private int pos;
+   
+   private AtomicInteger sizeBytes = new AtomicInteger(0);
 
    private AtomicInteger messagesAdded = new AtomicInteger(0);
 
@@ -109,7 +111,7 @@
    
    public QueueImpl(final long persistenceID, final SimpleString name,
          final Filter filter, final boolean clustered, final boolean durable,
-         final boolean temporary, final int maxSize,
+         final boolean temporary, final int maxSizeBytes,
          final ScheduledExecutorService scheduledExecutor)
    {
       this.persistenceID = persistenceID;
@@ -124,7 +126,7 @@
 
       this.temporary = temporary;
 
-      this.maxSize = maxSize;
+      this.maxSizeBytes = maxSizeBytes;
 
       this.scheduledExecutor = scheduledExecutor;
 
@@ -178,9 +180,6 @@
       deliver();
    }
  
-  // private volatile int count = 0;
-   
-
    public void deliverAsync(final Executor executor)
    {
       //Prevent too many executors running at once
@@ -238,11 +237,7 @@
          {
             if (iterator == null)
             {
-//               count++;
-//               if (count == 500000)
-//               {
-                  messageReferences.removeFirst();
-              // }
+               messageReferences.removeFirst();              
             }
             else
             {
@@ -385,14 +380,16 @@
       return deliveringCount.get();
    }
 
-   public void referenceAcknowledged() throws Exception
+   public void referenceAcknowledged(MessageReference ref) throws Exception
    {
       deliveringCount.decrementAndGet();
+      
+      sizeBytes.addAndGet(-ref.getMessage().encodeSize());
 
-      if (flowController != null)
-      {
-         flowController.messageAcknowledged();
-      }
+//      if (flowController != null)
+//      {
+//         flowController.messageAcknowledged();
+//      }
    }
 
    public void referenceCancelled()
@@ -400,20 +397,14 @@
       deliveringCount.decrementAndGet();
    }
 
-   public int getMaxSize()
+   public int getMaxSizeBytes()
    {
-      return maxSize;
+      return maxSizeBytes;
    }
-
-   public synchronized void setMaxSize(final int maxSize)
+   
+   public int getSizeBytes()
    {
-      int num = messageReferences.size() + scheduledRunnables.size();
-
-      if (maxSize < num) { throw new IllegalArgumentException(
-            "Cannot set maxSize to " + maxSize + " since there are " + num
-                  + " refs"); }
-
-      this.maxSize = maxSize;
+      return sizeBytes.get();
    }
 
    public DistributionPolicy getDistributionPolicy()
@@ -499,19 +490,16 @@
 
    private HandleStatus add(final MessageReference ref, final boolean first)
    {
-      if (maxSize != -1)
+      if (maxSizeBytes != -1 && sizeBytes.get() >= maxSizeBytes)
       {
-         int size = deliveringCount.get() + messageReferences.size() + scheduledRunnables.size();
-         
-         if (size >= maxSize)
-         {
-            return HandleStatus.BUSY;
-         }      
+         return HandleStatus.BUSY;              
       }
       
       if (!first)
       {
          messagesAdded.incrementAndGet();
+         
+         sizeBytes.addAndGet(ref.getMessage().encodeSize());
       }
       
       if (checkAndSchedule(ref))

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -90,7 +90,7 @@
          
    private final Object startStopLock = new Object();
 
-   private final AtomicInteger availableTokens;
+   private final AtomicInteger availableCredits;
    
    private boolean started;
    
@@ -139,11 +139,11 @@
       
       if (enableFlowControl)
       {
-         availableTokens = new AtomicInteger(0);
+         availableCredits = new AtomicInteger(0);
       }
       else
       {
-      	availableTokens = null;
+      	availableCredits = null;
       }
       
       messageQueue.addConsumer(this);
@@ -163,7 +163,7 @@
    
    public HandleStatus handle(MessageReference ref) throws Exception
    {      
-      if (availableTokens != null && availableTokens.get() <= 0)
+      if (availableCredits != null && availableCredits.get() <= 0)
       {
          return HandleStatus.BUSY;
       }
@@ -207,9 +207,9 @@
             }            
          }
                          
-         if (availableTokens != null)
+         if (availableCredits != null)
          {
-            availableTokens.addAndGet(-message.encodeSize());
+            availableCredits.addAndGet(-message.encodeSize());
          }
                    
          try
@@ -272,13 +272,13 @@
       }
    }
    
-   public void receiveTokens(final int tokens) throws Exception
+   public void receiveCredits(final int credits) throws Exception
    {      
-      if (availableTokens != null)
+      if (availableCredits != null)
       {
-         int previous = availableTokens.getAndAdd(tokens);
+         int previous = availableCredits.getAndAdd(credits);
 
-         if (previous <= 0 && (previous + tokens) > 0)
+         if (previous <= 0 && (previous + credits) > 0)
          {
             promptDelivery();
          }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -25,7 +25,7 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.PacketReturner;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowTokenMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
 import org.jboss.messaging.core.server.ServerConsumer;
 
@@ -61,8 +61,8 @@
       switch (type)
       {
       case EmptyPacket.CONS_FLOWTOKEN:
-         ConsumerFlowTokenMessage message = (ConsumerFlowTokenMessage) packet;
-         consumer.receiveTokens(message.getTokens());
+         ConsumerFlowCreditMessage message = (ConsumerFlowCreditMessage) packet;
+         consumer.receiveCredits(message.getTokens());
          break;
       case EmptyPacket.CLOSE:
          consumer.close();

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -21,11 +21,13 @@
   */
 package org.jboss.messaging.core.server.impl;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.postoffice.FlowController;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.PacketReturner;
-import org.jboss.messaging.core.remoting.impl.wireformat.ProducerReceiveTokensMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ProducerFlowCreditMessage;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.ServerProducer;
 import org.jboss.messaging.core.server.ServerSession;
@@ -52,15 +54,21 @@
 	
 	private final FlowController flowController;
 	
+	private final int windowSize;
+	
 	private final PacketReturner sender;
 	
 	private volatile boolean waiting;
 	
+   private AtomicInteger creditsToSend = new AtomicInteger(0);
+     	
 	// Constructors ----------------------------------------------------------------
 	
-	public ServerProducerImpl(final long id, final long clientTargetID, final ServerSession session, final SimpleString address, 
+	public ServerProducerImpl(final long id, final long clientTargetID, final ServerSession session,
+	                          final SimpleString address, 
 			                    final PacketReturner sender,
-			                    final FlowController flowController) throws Exception
+			                    final FlowController flowController,
+			                    final int windowSize) throws Exception
 	{
 		this.id = id;
 		
@@ -72,7 +80,9 @@
 		
 		this.sender = sender;
 		
-		this.flowController = flowController;				
+		this.flowController = flowController;		
+		
+		this.windowSize = windowSize;
 	}
 	
 	// ServerProducer implementation --------------------------------------------
@@ -86,26 +96,41 @@
 	{
 		session.removeProducer(this);
 	}
-		
+	
+
 	public void send(final ServerMessage message) throws Exception
 	{		
 		if (this.address != null)
 		{			
 		   //Only do flow control with non anonymous producers
 		   
-		   //TODO - flow control currently disabled
-//			if (flowController != null)
-//		   {
-//				flowController.messageReceived(this, 1);			
-//			}
+			if (flowController != null)
+		   {
+			   int creds = creditsToSend.addAndGet(message.encodeSize());
+			   
+			   if (creds >= windowSize)
+			   {
+			      requestAndSendCredits();
+			   }
+			}
 		}
 		
 		session.send(message);  		
 	}
+	
+	public void requestAndSendCredits() throws Exception
+	{	 
+	   if (!waiting)
+	   {
+	      flowController.requestAndSendCredits(this, creditsToSend.get());
+	   }
+	}
 
-	public void sendCredits() throws Exception
+	public void sendCredits(final int credits) throws Exception
 	{
-		Packet packet = new ProducerReceiveTokensMessage(1);
+	   creditsToSend.addAndGet(-credits);
+	   
+		Packet packet = new ProducerFlowCreditMessage(credits);
 		
 		packet.setTargetID(clientTargetID);
 		packet.setExecutorID(session.getID());

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -979,7 +979,7 @@
 
          SimpleString filterString = filter == null ? null : filter.getFilterString();
 
-         response = new SessionQueueQueryResponseMessage(queue.isDurable(), queue.isTemporary(), queue.getMaxSize(),
+         response = new SessionQueueQueryResponseMessage(queue.isDurable(), queue.isTemporary(), queue.getMaxSizeBytes(),
                  queue.getConsumerCount(), queue.getMessageCount(),
                  filterString, binding.getAddress());
       }
@@ -1044,9 +1044,8 @@
     * @param address    The address to produce too
     * @param windowSize - the producer window size to use for flow control.
     *                   Specify -1 to disable flow control completely
-    *                   The actual window size used may be less than the specified window size if the queue's maxSize attribute
-    *                   is set and there are not sufficient empty spaces in the queue, or it is overridden by any producer-window_size
-    *                   specified on the queue
+    *                   The actual window size used may be less than the specified window size if
+    *                   it is overridden by any producer-window-size specified on the queue
     */
    public SessionCreateProducerResponseMessage createProducer(final long clientTargetID, final SimpleString address, final int windowSize,
                                                               final int maxRate) throws Exception
@@ -1055,24 +1054,31 @@
 
       final int maxRateToUse = maxRate;
 
-      // TODO Flow control disabled for now
+   	if (address != null)
+   	{
+   		flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
+   	}
 
-//   	if (address != null)
-//   	{
-//   		flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
-//   	}
-
       long id = dispatcher.generateID();
+      
+      final int windowToUse = flowController == null ? -1 : windowSize;
+      
+      //Server window size is 0.75 client window size for producer flow control (other way round to consumer flow control)
+      
+      final int serverWindowSize = windowToUse == -1 ? -1 : (int)(windowToUse * 0.75);            
+      
+      ServerProducerImpl producer 
+         = new ServerProducerImpl(id, clientTargetID, this, address, sender, flowController, serverWindowSize);
 
-      ServerProducerImpl producer = new ServerProducerImpl(id, clientTargetID, this, address, sender, flowController);
-
       producers.add(producer);
 
       dispatcher.register(new ServerProducerPacketHandler(producer));
-
-      final int windowToUse = flowController == null ? -1 : flowController.getInitialTokens(windowSize, producer);
-
-      return new SessionCreateProducerResponseMessage(producer.getID(), windowToUse, maxRateToUse);
+      
+      //Get some initial credits to send to the producer - we try for windowToUse
+      
+      int initialCredits = flowController == null ? -1 : flowController.getInitialCredits(windowToUse, producer);
+      
+      return new SessionCreateProducerResponseMessage(producer.getID(), initialCredits, maxRateToUse);
    }
 
    // Public ---------------------------------------------------------------------------------------------
@@ -1104,7 +1110,7 @@
          }
       }
 
-      queue.referenceAcknowledged();
+      queue.referenceAcknowledged(ref);
    }
 
 

Modified: trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -23,7 +23,6 @@
 
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.server.DistributionPolicy;
-import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.impl.RoundRobinDistributionPolicy;
 import org.jboss.messaging.core.settings.Mergeable;
 import org.jboss.messaging.util.SimpleString;
@@ -32,6 +31,7 @@
  * The Queue Settings that will be used to configure a queue
  * 
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
  */
 public class QueueSettings implements Mergeable<QueueSettings>
 {
@@ -41,14 +41,14 @@
     */
    public static final DistributionPolicy DEFAULT_DISTRIBUTION_POLICY = new RoundRobinDistributionPolicy();
    public static final Boolean DEFAULT_CLUSTERED = false;
-   public static final Integer DEFAULT_MAX_SIZE = -1;
+   public static final Integer DEFAULT_MAX_SIZE_BYTES = -1;
    public static final Integer DEFAULT_MAX_DELIVERY_ATTEMPTS = 10;
    public static final Integer DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT = 0;
    public static final Long DEFAULT_REDELIVER_DELAY = (long) 500;
    
 
    private Boolean clustered = null;
-   private Integer maxSize = null;
+   private Integer maxSizeBytes = null;
    private String distributionPolicyClass = null;
    private Integer maxDeliveryAttempts = null;
    private Integer messageCounterHistoryDayLimit = null;
@@ -71,14 +71,14 @@
       this.clustered = clustered;
    }
 
-   public Integer getMaxSize()
+   public Integer getMaxSizeBytes()
    {
-      return maxSize != null?maxSize:DEFAULT_MAX_SIZE;
+      return maxSizeBytes != null ? maxSizeBytes:DEFAULT_MAX_SIZE_BYTES;
    }
 
-   public void setMaxSize(Integer maxSize)
+   public void setMaxSizeBytes(Integer maxSizeBytes)
    {
-      this.maxSize = maxSize;
+      this.maxSizeBytes = maxSizeBytes;
    }
 
    public Integer getMaxDeliveryAttempts()
@@ -212,9 +212,9 @@
       {
          maxDeliveryAttempts = merged.maxDeliveryAttempts;
       }
-      if(maxSize == null)
+      if(maxSizeBytes == null)
       {
-         maxSize = merged.maxSize;
+         maxSizeBytes = merged.maxSizeBytes;
       }
       if(messageCounterHistoryDayLimit == null)
       {

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -220,7 +220,7 @@
 
       for (MessageReference reference : acknowledgements)
       {
-         reference.getQueue().referenceAcknowledged();
+         reference.getQueue().referenceAcknowledged(reference);
       }
 
       clear();

Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -130,7 +130,7 @@
          
          int consumerWindowSize = 1024 * 1024;
          int consumerMaxRate = -1;         
-         int producerWindowSize = 1000;
+         int producerWindowSize = 1024 * 1024;
          int producerMaxRate = -1;
          boolean blockOnAcknowledge = false;
          boolean sendNonPersistentMessagesSynchronously = false;
@@ -194,9 +194,6 @@
 
             if (ENTRY_NODE_NAME.equalsIgnoreCase(children.item(i).getNodeName()))
             {
-            	
-            	log.info("Creating cf ** with ws:" + producerWindowSize);
-            	
                String jndiName = child.getAttributes().getNamedItem("name").getNodeValue();
                String name = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
                jmsServerManager.createConnectionFactory(name, clientID, dupsOKBatchSize, cfStrictTck,

Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -154,9 +154,9 @@
 
    public boolean destroyQueue(String name) throws Exception
    {
-   	JBossQueue jBossQueue = new JBossQueue(name);
-      messagingServerManagement.destroyQueue(new SimpleString(name));
-      messagingServerManagement.removeDestination(jBossQueue.getSimpleAddress());
+ //  	JBossQueue jBossQueue = new JBossQueue(name);
+//      messagingServerManagement.destroyQueue(new SimpleString(name));
+//      messagingServerManagement.removeDestination(jBossQueue.getSimpleAddress());
       List<String> jndiBindings = destinations.get(name);
       if (jndiBindings == null || jndiBindings.size() == 0)
       {
@@ -172,8 +172,8 @@
 
    public boolean destroyTopic(String name) throws Exception
    {
-      JBossTopic jBossTopic = new JBossTopic(name);
-      messagingServerManagement.removeDestination(jBossTopic.getSimpleAddress());
+   //   JBossTopic jBossTopic = new JBossTopic(name);
+    //  messagingServerManagement.removeDestination(jBossTopic.getSimpleAddress());
       List<String> jndiBindings = destinations.get(name);
       if (jndiBindings == null || jndiBindings.size() == 0)
       {
@@ -637,7 +637,7 @@
             }
 
             SubscriptionInfo info = new SubscriptionInfo(queue.getName().toString(), queue.isDurable(), subName, clientID,
-                    queue.getFilter() == null ? null : queue.getFilter().getFilterString().toString(), queue.getMessageCount(), queue.getMaxSize());
+                    queue.getFilter() == null ? null : queue.getFilter().getFilterString().toString(), queue.getMessageCount(), queue.getMaxSizeBytes());
 
             subs.add(info);
          }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueSettingsDeployerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueSettingsDeployerTest.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueSettingsDeployerTest.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -65,7 +65,7 @@
       QueueSettings queueSettings = new QueueSettings();
       queueSettings.setClustered(false);
       queueSettings.setRedeliveryDelay((long) 100);
-      queueSettings.setMaxSize(-100);
+      queueSettings.setMaxSizeBytes(-100);
       queueSettings.setDistributionPolicyClass("org.jboss.messaging.core.impl.RoundRobinDistributionPolicy");
       queueSettings.setMessageCounterHistoryDayLimit(1000);
       queueSettings.setDLQ(new SimpleString("DLQtest"));
@@ -111,7 +111,7 @@
          if (!queueSettings.isClustered().equals(that.isClustered())) return false;
          if (!queueSettings.getDistributionPolicyClass().equals(that.getDistributionPolicyClass())) return false;
          if (!queueSettings.getMaxDeliveryAttempts().equals(that.getMaxDeliveryAttempts())) return false;
-         if (!queueSettings.getMaxSize().equals(that.getMaxSize())) return false;
+         if (!queueSettings.getMaxSizeBytes().equals(that.getMaxSizeBytes())) return false;
          if (!queueSettings.getMessageCounterHistoryDayLimit().equals(that.getMessageCounterHistoryDayLimit()))
             return false;
          if (!queueSettings.getRedeliveryDelay().equals(that.getRedeliveryDelay())) return false;

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -80,13 +80,13 @@
 import org.jboss.messaging.core.remoting.impl.mina.MessagingCodec;
 import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowTokenMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
 import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
 import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
 import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-import org.jboss.messaging.core.remoting.impl.wireformat.ProducerReceiveTokensMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ProducerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -486,7 +486,7 @@
             randomLong(), randomInt(), randomInt());
 
       Packet decodedPacket = encodeAndCheckBytesAndDecode(response, response
-            .getProducerTargetID(), response.getWindowSize(), response
+            .getProducerTargetID(), response.getInitialCredits(), response
             .getMaxRate());
 
       assertTrue(decodedPacket instanceof SessionCreateProducerResponseMessage);
@@ -494,7 +494,7 @@
       assertEquals(SESS_CREATEPRODUCER_RESP, decodedResponse.getType());
       assertEquals(response.getProducerTargetID(), decodedResponse
             .getProducerTargetID());
-      assertEquals(response.getWindowSize(), decodedResponse.getWindowSize());
+      assertEquals(response.getInitialCredits(), decodedResponse.getInitialCredits());
       assertEquals(response.getMaxRate(), decodedResponse.getMaxRate());
    }
 
@@ -518,28 +518,28 @@
 
    public void testConsumerFlowTokenMessage() throws Exception
    {
-      ConsumerFlowTokenMessage message = new ConsumerFlowTokenMessage(
+      ConsumerFlowCreditMessage message = new ConsumerFlowCreditMessage(
             randomInt());
 
       Packet decodedPacket = encodeAndCheckBytesAndDecode(message, message
             .getTokens());
 
-      assertTrue(decodedPacket instanceof ConsumerFlowTokenMessage);
-      ConsumerFlowTokenMessage decodedMessage = (ConsumerFlowTokenMessage) decodedPacket;
+      assertTrue(decodedPacket instanceof ConsumerFlowCreditMessage);
+      ConsumerFlowCreditMessage decodedMessage = (ConsumerFlowCreditMessage) decodedPacket;
       assertEquals(CONS_FLOWTOKEN, decodedMessage.getType());
       assertEquals(message.getTokens(), decodedMessage.getTokens());
    }
 
    public void testProducerReceiveTokensMessage() throws Exception
    {
-      ProducerReceiveTokensMessage message = new ProducerReceiveTokensMessage(
+      ProducerFlowCreditMessage message = new ProducerFlowCreditMessage(
             randomInt());
 
       Packet decodedPacket = encodeAndCheckBytesAndDecode(message, message
             .getTokens());
 
-      assertTrue(decodedPacket instanceof ProducerReceiveTokensMessage);
-      ProducerReceiveTokensMessage decodedMessage = (ProducerReceiveTokensMessage) decodedPacket;
+      assertTrue(decodedPacket instanceof ProducerFlowCreditMessage);
+      ProducerFlowCreditMessage decodedMessage = (ProducerFlowCreditMessage) decodedPacket;
       assertEquals(PROD_RECEIVETOKENS, decodedMessage.getType());
       assertEquals(message.getTokens(), decodedMessage.getTokens());
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueTest.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueTest.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -114,7 +114,7 @@
       assertTrue(queue.isTemporary());
    }
    
-   public void testGetSetMaxSize()
+   public void testGetMaxSizeBytes()
    {
       final int maxSize = 123456;
       
@@ -124,13 +124,7 @@
       
       assertEquals(id, queue.getPersistenceID());
       
-      assertEquals(maxSize, queue.getMaxSize());
-      
-      final int maxSize2 = 654321;
-      
-      queue.setMaxSize(maxSize2);
-      
-      assertEquals(maxSize2, queue.getMaxSize());
+      assertEquals(maxSize, queue.getMaxSizeBytes());
    }
    
    public void testAddRemoveConsumer()
@@ -216,7 +210,7 @@
    {
       Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
       
-      assertEquals(-1, queue.getMaxSize());        
+      assertEquals(-1, queue.getMaxSizeBytes());        
    }
    
    public void testSimpleAddLast()
@@ -513,9 +507,9 @@
       
       refs.clear();
       
-      for (int i = 0; i < numMessages; i++)
+      for (MessageReference ref: refs)
       {
-         queue.referenceAcknowledged();
+         queue.referenceAcknowledged(ref);
       }
       
       for (int i = 0; i < 2 * numMessages; i++)
@@ -537,11 +531,12 @@
       
       cons1.getReferences().clear();
       cons2.getReferences().clear();
-      refs.clear();
-      for (int i = 0; i < 2 * numMessages; i++)
+      
+      for (MessageReference ref: refs)
       {
-         queue.referenceAcknowledged();
+         queue.referenceAcknowledged(ref);
       }
+      refs.clear();
       
       FakeConsumer cons3 = new FakeConsumer();
       
@@ -572,11 +567,12 @@
       
       cons3.getReferences().clear();
       cons2.getReferences().clear();
-      refs.clear();
-      for (int i = 0; i < 3 * numMessages; i++)
+      
+      for (MessageReference ref: refs)
       {
-         queue.referenceAcknowledged();
+         queue.referenceAcknowledged(ref);
       }
+      refs.clear();
       
       for (int i = 0; i < 2 * numMessages; i++)
       {
@@ -598,11 +594,12 @@
       queue.removeConsumer(cons3);
       
       cons2.getReferences().clear();
-      refs.clear();
-      for (int i = 0; i < 2 * numMessages; i++)
+      
+      for (MessageReference ref: refs)
       {
-         queue.referenceAcknowledged();
+         queue.referenceAcknowledged(ref);
       }
+      refs.clear();
       
       for (int i = 0; i < numMessages; i++)
       {
@@ -862,39 +859,7 @@
          MessageReference ref = generateReference(queue, i);
          
          assertEquals(HandleStatus.BUSY, queue.addLast(ref));
-      }
-    
-      //Increase the max size
-      
-      queue.setMaxSize(2 * queue.getMaxSize());
-      
-      for (int i = 0; i < maxSize; i++)
-      {
-         MessageReference ref = generateReference(queue, i);
-         
-         refs.add(ref);
-         
-         assertEquals(HandleStatus.HANDLED, queue.addLast(ref));
-      }
-      
-      assertEquals(maxSize * 2, queue.getMessageCount());   
-      assertEquals(0, queue.getScheduledCount());
-      assertEquals(0, queue.getDeliveringCount());
-      
-      //Now try and decrease maxSize
-      
-      try
-      {
-         queue.setMaxSize(maxSize);
-         
-         fail("Should throw exception");
-      }
-      catch (IllegalArgumentException e)
-      {
-         //Ok
-      }
-      
-      assertEquals(2 * maxSize, queue.getMaxSize());      
+      }                    
    }
    
    public void testWithPriorities()
@@ -1098,7 +1063,7 @@
             
       assertRefListsIdenticalRefs(refs, consumer.getReferences()); 
       
-      queue.referenceAcknowledged();
+      queue.referenceAcknowledged(ref2);
 
       queue.removeConsumer(consumer);
             
@@ -1206,8 +1171,8 @@
             
       assertRefListsIdenticalRefs(refs, consumer.getReferences()); 
       
-      queue.referenceAcknowledged();
-      queue.referenceAcknowledged();
+      queue.referenceAcknowledged(ref5);
+      queue.referenceAcknowledged(ref6);
       
       queue.removeConsumer(consumer);
       

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/settings/impl/QueueSettingsTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/settings/impl/QueueSettingsTest.java	2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/settings/impl/QueueSettingsTest.java	2008-05-24 07:07:10 UTC (rev 4296)
@@ -42,7 +42,7 @@
       assertEquals(queueSettings.isClustered(), Boolean.valueOf(false));
       assertEquals(queueSettings.getExpiryQueue(), null);
       assertEquals(queueSettings.getMaxDeliveryAttempts(), QueueSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS);
-      assertEquals(queueSettings.getMaxSize(), QueueSettings.DEFAULT_MAX_SIZE);
+      assertEquals(queueSettings.getMaxSizeBytes(), QueueSettings.DEFAULT_MAX_SIZE_BYTES);
       assertEquals(queueSettings.getMessageCounterHistoryDayLimit(), QueueSettings.DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT);
       assertEquals(queueSettings.getRedeliveryDelay(), QueueSettings.DEFAULT_REDELIVER_DELAY);
 
@@ -58,7 +58,7 @@
       queueSettingsToMerge.setDLQ(DLQ);
       queueSettingsToMerge.setExpiryQueue(exp);
       queueSettingsToMerge.setMaxDeliveryAttempts(1000);
-      queueSettingsToMerge.setMaxSize(1001);
+      queueSettingsToMerge.setMaxSizeBytes(1001);
       queueSettingsToMerge.setMessageCounterHistoryDayLimit(1002);
       queueSettingsToMerge.setRedeliveryDelay((long)1003);
       queueSettings.merge(queueSettingsToMerge);
@@ -68,7 +68,7 @@
       assertEquals(queueSettings.getDLQ(), DLQ);
       assertEquals(queueSettings.getExpiryQueue(), exp);
       assertEquals(queueSettings.getMaxDeliveryAttempts(), Integer.valueOf(1000));
-      assertEquals(queueSettings.getMaxSize(), Integer.valueOf(1001));
+      assertEquals(queueSettings.getMaxSizeBytes(), Integer.valueOf(1001));
       assertEquals(queueSettings.getMessageCounterHistoryDayLimit(), Integer.valueOf(1002));
       assertEquals(queueSettings.getRedeliveryDelay(), Long.valueOf(1003));
    }
@@ -83,7 +83,7 @@
       queueSettingsToMerge.setDLQ(DLQ);
       queueSettingsToMerge.setExpiryQueue(exp);
       queueSettingsToMerge.setMaxDeliveryAttempts(1000);
-      queueSettingsToMerge.setMaxSize(1001);
+      queueSettingsToMerge.setMaxSizeBytes(1001);
       queueSettingsToMerge.setMessageCounterHistoryDayLimit(1002);
       queueSettings.merge(queueSettingsToMerge);
 
@@ -91,7 +91,7 @@
       queueSettingsToMerge2.setClustered(true);
       SimpleString exp2 = new SimpleString("testExpiryQueue2");
       queueSettingsToMerge2.setExpiryQueue(exp2);
-      queueSettingsToMerge2.setMaxSize(2001);
+      queueSettingsToMerge2.setMaxSizeBytes(2001);
       queueSettingsToMerge2.setRedeliveryDelay((long)2003);
       queueSettings.merge(queueSettingsToMerge2);
 
@@ -101,7 +101,7 @@
       assertEquals(queueSettings.getDLQ(), DLQ);
       assertEquals(queueSettings.getExpiryQueue(), exp);
       assertEquals(queueSettings.getMaxDeliveryAttempts(), Integer.valueOf(1000));
-      assertEquals(queueSettings.getMaxSize(), Integer.valueOf(1001));
+      assertEquals(queueSettings.getMaxSizeBytes(), Integer.valueOf(1001));
       assertEquals(queueSettings.getMessageCounterHistoryDayLimit(), Integer.valueOf(1002));
       assertEquals(queueSettings.getRedeliveryDelay(), Long.valueOf(2003));
    }
@@ -115,7 +115,7 @@
       SimpleString exp = new SimpleString("testExpiryQueue");
       queueSettingsToMerge.setDLQ(DLQ);
       queueSettingsToMerge.setExpiryQueue(exp);
-      queueSettingsToMerge.setMaxSize(1001);
+      queueSettingsToMerge.setMaxSizeBytes(1001);
       queueSettingsToMerge.setRedeliveryDelay((long)1003);
       queueSettings.merge(queueSettingsToMerge);
 
@@ -126,7 +126,7 @@
       queueSettingsToMerge2.setExpiryQueue(exp2);
       queueSettingsToMerge2.setDLQ(DLQ2);
       queueSettingsToMerge2.setMaxDeliveryAttempts(2000);
-      queueSettingsToMerge2.setMaxSize(2001);
+      queueSettingsToMerge2.setMaxSizeBytes(2001);
       queueSettingsToMerge2.setMessageCounterHistoryDayLimit(2002);
       queueSettingsToMerge2.setRedeliveryDelay((long)2003);
       queueSettings.merge(queueSettingsToMerge2);
@@ -137,7 +137,7 @@
       assertEquals(queueSettings.getDLQ(), DLQ);
       assertEquals(queueSettings.getExpiryQueue(), exp);
       assertEquals(queueSettings.getMaxDeliveryAttempts(), Integer.valueOf(2000));
-      assertEquals(queueSettings.getMaxSize(), Integer.valueOf(1001));
+      assertEquals(queueSettings.getMaxSizeBytes(), Integer.valueOf(1001));
       assertEquals(queueSettings.getMessageCounterHistoryDayLimit(), Integer.valueOf(2002));
       assertEquals(queueSettings.getRedeliveryDelay(), Long.valueOf(1003));
    }




More information about the jboss-cvs-commits mailing list