[jboss-cvs] JBoss Messaging SVN: r3853 - in trunk: src/etc/server/default/deploy and 33 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Mar 7 11:33:34 EST 2008


Author: timfox
Date: 2008-03-07 11:33:34 -0500 (Fri, 07 Mar 2008)
New Revision: 3853

Added:
   trunk/tests/src/org/jboss/messaging/core/journal/
   trunk/tests/src/org/jboss/messaging/core/journal/impl/
   trunk/tests/src/org/jboss/messaging/core/journal/impl/test/
   trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/
   trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java
   trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/
   trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java
Removed:
   trunk/tests/src/org/jboss/test/messaging/jms/ssl/
Modified:
   trunk/.classpath
   trunk/build-messaging.xml
   trunk/src/etc/server/default/deploy/jbm-beans.xml
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/management/MessagingServerManagement.java
   trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerManagementImpl.java
   trunk/src/main/org/jboss/messaging/core/message/MessageReference.java
   trunk/src/main/org/jboss/messaging/core/message/impl/MessageReferenceImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/CreateConnectionMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerResponseMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerResponseMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionRequest.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
   trunk/src/main/org/jboss/messaging/core/server/Queue.java
   trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java
   trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
   trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
   trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
   trunk/src/main/org/jboss/messaging/util/TokenBucketLimiter.java
   trunk/tests/src/org/jboss/messaging/core/persistence/impl/bdbje/test/unit/BDBJEPersistenceManagerTest.java
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java
   trunk/tests/src/org/jboss/messaging/core/server/impl/test/timing/QueueTest.java
   trunk/tests/src/org/jboss/messaging/core/server/impl/test/unit/QueueTest.java
   trunk/tests/src/org/jboss/messaging/core/server/impl/test/unit/fakes/FakeQueueFactory.java
   trunk/tests/src/org/jboss/messaging/core/settings/impl/test/unit/QueueSettingsTest.java
   trunk/tests/src/org/jboss/messaging/core/transaction/impl/test/unit/TransactionTest.java
   trunk/tests/src/org/jboss/messaging/core/util/test/unit/TokenBucketLimiterTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/server/JMSServerManagerTest.java
   trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
Log:
Various changes


Modified: trunk/.classpath
===================================================================
--- trunk/.classpath	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/.classpath	2008-03-07 16:33:34 UTC (rev 3853)
@@ -77,5 +77,6 @@
 	<classpathentry kind="lib" path="src/etc/server/default/config"/>
 	<classpathentry kind="lib" path="src/etc/server/default/deploy"/>
 	<classpathentry kind="lib" path="lib/je-3.2.74.jar"/>
+	<classpathentry kind="lib" path="tests/lib/grizzly-framework-1.7.2.jar"/>
 	<classpathentry kind="output" path="bin"/>
 </classpath>

Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/build-messaging.xml	2008-03-07 16:33:34 UTC (rev 3853)
@@ -81,8 +81,8 @@
    <property name="source.java" value="${project.source}/main"/>
    <property name="source.etc" value="${project.source}/etc"/>
    <property name="project.output" value="${project.root}/output"/>
-   <property name="build.jniheader" value="${project.output}/include"/>
-   <property name="build.classes" value="${project.output}/classes"/>
+   <property name="build.jniheader" value="${project.output}/include"/>
+   <property name="build.classes" value="${project.output}/classes"/>
    <property name="build.lib" value="${project.output}/lib"/>
    <property name="build.api" value="${project.output}/api"/>
    <property name="build.etc" value="${project.output}/etc"/>
@@ -139,7 +139,7 @@
       <path refid="apache.mina.classpath"/>
       <path refid="slf4j.api.classpath"/>
       <path refid="slf4j.log4j.classpath"/>
-      <!-- <pathelement path="${build.lib}/je-3.2.74.jar"/> -->
+      <pathelement path="${build.lib}/je-3.2.44.jar"/>
    </path>
 
    <!--
@@ -171,7 +171,7 @@
    <path id="compilation.classpath">
       <path refid="external.dependencies.classpath"/>
       <path refid="jboss.dependencies.classpath"/>
-      <pathelement location="${project.root}/lib/je-3.2.74.jar"/>
+      <pathelement location="${project.root}/lib/je-3.2.44.jar"/>
       <pathelement location="${build.classes}"/> 
    </path>
 
@@ -240,8 +240,8 @@
          <include name="**/*.java"/>
          <classpath refid="compilation.classpath"/>
       </javac>
-   </target>
-	
+   </target>
+	
    <target name="compile-etc">
 
       <mkdir dir="${build.etc}"/>
@@ -657,25 +657,25 @@
    <!-- ======================================================================================== -->
    <!-- Native Tasks                                                                             -->
    <!-- ======================================================================================== -->
-
-	<property name="native.src" value="./native"/>
-	<property name="native.include" value="${native.src}/src"/>
-	
-	<target name="native-header" depends="compile">
-            <javah class="org.jboss.messaging.core.asyncio.impl.JlibAIO" classpathref="compilation.classpath" destdir="${native.include}">
-            </javah>
-    </target>
-
-	<target name="native" depends="native-header">
-		<exec dir="${native.src}" executable="autoreconf">
-			<arg value="--install"/>
-		</exec>
-		<exec dir="${native.src}" executable="sh">
-			<arg value="./configure"/>
-		</exec>
-		<exec dir="${native.src}" executable="make"/>
-    </target>
-
+
+	<property name="native.src" value="./native"/>
+	<property name="native.include" value="${native.src}/src"/>
+	
+	<target name="native-header" depends="compile">
+            <javah class="org.jboss.messaging.core.asyncio.impl.JlibAIO" classpathref="compilation.classpath" destdir="${native.include}">
+            </javah>
+    </target>
+
+	<target name="native" depends="native-header">
+		<exec dir="${native.src}" executable="autoreconf">
+			<arg value="--install"/>
+		</exec>
+		<exec dir="${native.src}" executable="sh">
+			<arg value="./configure"/>
+		</exec>
+		<exec dir="${native.src}" executable="make"/>
+    </target>
+
    <!-- ======================================================================================== -->
    <!-- Cleaning Tasks                                                                           -->
    <!-- ======================================================================================== -->

Modified: trunk/src/etc/server/default/deploy/jbm-beans.xml
===================================================================
--- trunk/src/etc/server/default/deploy/jbm-beans.xml	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/etc/server/default/deploy/jbm-beans.xml	2008-03-07 16:33:34 UTC (rev 3853)
@@ -4,8 +4,7 @@
 
    <bean name="Configuration" class="org.jboss.messaging.core.config.impl.FileConfiguration"/>
 
-   <bean name="ServiceLocator" class="org.jboss.messaging.microcontainer.ServiceLocator">
-   </bean>
+   <bean name="ServiceLocator" class="org.jboss.messaging.microcontainer.ServiceLocator"/>
 
    <bean name="MessagingServerManagement" class="org.jboss.messaging.core.management.impl.MessagingServerManagementImpl">
       <annotation>@org.jboss.aop.microcontainer.aspects.jmx.JMX(name="jboss.messaging:service=MessagingServerManagement", exposedInterface=org.jboss.messaging.core.management.MessagingServerManagement.class)</annotation>
@@ -40,9 +39,7 @@
       </property>
             
    </bean>
-   
-   
-   
+         
    <bean name="BDBJEEnvironment" class="org.jboss.messaging.core.persistence.impl.bdbje.integration.RealBDBJEEnvironment">
    
       <property name="environmentPath">${user.home}/bdbje/env</property>
@@ -51,9 +48,11 @@
       
       <property name="syncVM">true</property>
       
-      <property name="syncOS">false</property>
+      <property name="syncOS">true</property>
    
-      <property name="memoryCacheSize">-1</property>
+      <!-- We set the BDB cache low. We don't really use BDB for caching, and setting it large will increase
+      memory footprint since messages will be in memory twice - once in the queue and once in the cache -->
+      <property name="memoryCacheSize">1048576</property>
             
    </bean>
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -69,33 +69,35 @@
  
    private final int serverID;
    
-   private final int prefetchSize;
-
    private final boolean strictTck;
+      
+   private final int defaultConsumerWindowSize;
    
-   private final int maxProducerRate;
+   private final int defaultConsumerMaxRate;
+
+   private final int defaultProducerWindowSize;
    
-   private final int producerWindowSize;
+   private final int defaultProducerMaxRate;
    
+   
    // Static ---------------------------------------------------------------------------------------
     
    // Constructors ---------------------------------------------------------------------------------
 
    public ClientConnectionFactoryImpl(final int serverID, final RemotingConfiguration remotingConfig,
    		                             final Version serverVersion, final boolean strictTck,
-                                      final int prefetchSize,
-                                      final int producerWindowSize, final int maxProducerRate)
+                                      final int defaultConsumerWindowSize, final int defaultConsumerMaxRate,
+                                      final int defaultProducerWindowSize, final int defaultProducerMaxRate)
    {
       this.serverID = serverID;
       this.remotingConfig = remotingConfig;
       this.serverVersion = serverVersion;
       this.strictTck = strictTck;
-      this.prefetchSize = prefetchSize;      
-      this.maxProducerRate = maxProducerRate;
-      this.producerWindowSize = producerWindowSize;
+      this.defaultConsumerWindowSize = defaultConsumerWindowSize;  
+      this.defaultConsumerMaxRate = defaultConsumerMaxRate;
+      this.defaultProducerWindowSize = defaultProducerWindowSize;
+      this.defaultProducerMaxRate = defaultProducerMaxRate;
       this.dispatcher = new PacketDispatcherImpl();
-      
-      log.info("creating cf with ws: "+ this.producerWindowSize + " and maxrate " + maxProducerRate);
    }
    
    public ClientConnectionFactoryImpl(final int serverID, final RemotingConfiguration remotingConfig,
@@ -105,9 +107,10 @@
       this.remotingConfig = remotingConfig;
       this.serverVersion = serverVersion;
       this.strictTck = false;
-      this.prefetchSize = 150;      
-      this.maxProducerRate = -1;
-      this.producerWindowSize = 1000;
+      this.defaultConsumerWindowSize = 1000;      
+      this.defaultConsumerMaxRate = -1;
+      this.defaultProducerWindowSize = 1000;
+      this.defaultProducerMaxRate = -1;
       this.dispatcher = new PacketDispatcherImpl();
    }
 
@@ -130,15 +133,15 @@
          String sessionID = remotingConnection.getSessionID();
          
          CreateConnectionRequest request =
-            new CreateConnectionRequest(v, sessionID, JMSClientVMIdentifier.instance, username, password,
-                  prefetchSize);
+            new CreateConnectionRequest(v, sessionID, JMSClientVMIdentifier.instance, username, password);
          
          CreateConnectionResponse response =
             (CreateConnectionResponse)remotingConnection.send(id, request);
          
          ClientConnectionImpl connection =
             new ClientConnectionImpl(response.getConnectionID(), serverID, strictTck, remotingConnection,
-            		maxProducerRate, producerWindowSize);
+            		defaultConsumerWindowSize, defaultConsumerMaxRate,
+            		defaultProducerWindowSize, defaultProducerMaxRate);
 
          return connection;
       }
@@ -182,14 +185,14 @@
       return serverVersion;
    }
 
-	public int getPrefetchSize()
+	public int getConsumerWindowSize()
 	{
-		return prefetchSize;
+		return defaultConsumerWindowSize;
 	}
 
 	public int getProducerWindowSize()
 	{
-		return producerWindowSize;
+		return defaultProducerWindowSize;
 	}
 
 	public int getServerID()
@@ -204,7 +207,7 @@
 
 	public int getMaxProducerRate()
 	{
-		return maxProducerRate;
+		return defaultProducerMaxRate;
 	}
 	
 	

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -70,17 +70,25 @@
 
    private volatile boolean closed;
    
-   private final int maxProducerRate;
+   private final int defaultConsumerWindowSize;
    
-   private final int producerWindowSize;
+   private final int defaultConsumerMaxRate;
+   
+   private final int defaultProducerWindowSize;
+   
+   private final int defaultProducerMaxRate;
+   
 
    // Static ---------------------------------------------------------------------------------------
 
    // Constructors ---------------------------------------------------------------------------------
 
    public ClientConnectionImpl(final String id, final int serverID, final boolean strictTck,
-                               final RemotingConnection connection, final int maxProducerRate,
-                               final int producerWindowSize)
+                               final RemotingConnection connection,
+                               final int defaultConsumerWindowSize,     
+                               final int defaultConsumerMaxRate,
+                               final int defaultProducerWindowSize,
+                               final int defaultProducerMaxRate)
    {
       this.id = id;
       
@@ -90,9 +98,13 @@
       
       this.remotingConnection = connection;
       
-      this.maxProducerRate = maxProducerRate;
+      this.defaultConsumerWindowSize = defaultConsumerWindowSize;
       
-      this.producerWindowSize = producerWindowSize;
+      this.defaultConsumerMaxRate = defaultConsumerMaxRate;
+      
+      this.defaultProducerWindowSize = defaultProducerWindowSize;
+      
+      this.defaultProducerMaxRate = defaultProducerMaxRate;
    }
    
    // ClientConnection implementation --------------------------------------------------------------
@@ -108,8 +120,9 @@
       ConnectionCreateSessionResponseMessage response = (ConnectionCreateSessionResponseMessage)remotingConnection.send(id, request);   
 
       ClientSession session =
-      	new ClientSessionImpl(this, response.getSessionID(), ackBatchSize, cacheProducers, maxProducerRate,
-      			                producerWindowSize, autoCommitSends, autoCommitAcks, blockOnAcknowledge);
+      	new ClientSessionImpl(this, response.getSessionID(), ackBatchSize, cacheProducers,
+      			autoCommitSends, autoCommitAcks, blockOnAcknowledge,
+      			defaultConsumerWindowSize, defaultConsumerMaxRate, defaultProducerWindowSize, defaultProducerMaxRate);
 
       children.put(response.getSessionID(), session);
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -276,8 +276,6 @@
          return;
       }
       
-      log.info("Got message " + message.getMessage().getMessageID() + " del id " + message.getDeliveryID());
-      
       if (ignoreDeliveryMark >= 0)
       {
          long delID = message.getDeliveryID();
@@ -286,14 +284,12 @@
          {
             // Ignore - the session is recovering and these are inflight
             // messages
-         	log.info("Ignoring");
             return;
          }
          else if (delID == ignoreDeliveryMark)
          {
             // We have hit the begining of the recovered messages - we can
             // stop ignoring
-         	log.info("Stopping ignoring");
             ignoreDeliveryMark = -1;
          }
          else
@@ -337,8 +333,6 @@
       	
       	synchronized (this)
       	{
-      		log.info("Adding to buffer: " + message.getMessage().getMessageID());
-      		
       		buffer.addLast(message, message.getMessage().getPriority());
          	      		
       		notify();
@@ -348,12 +342,9 @@
 
    public void recover(final long lastDeliveryID)
    {
-   	log.info("Calling recover " +lastDeliveryID);
-   	
       ignoreDeliveryMark = lastDeliveryID;
 
       buffer.clear();      
-      log.info("Called recover");
    }
 
    // Public

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -121,8 +121,8 @@
    {
    	ProducerSendMessage message = new ProducerSendMessage(address, msg.copy());
    	
-   	//Window size of -1 means disable window flow control
-   	if (windowSize != -1 && address == null)
+   	//We only flow control with non-anonymous producers
+   	if (address == null)
    	{
    		while (windowSize == 0)
    		{

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -105,10 +105,14 @@
    
    private final boolean cacheProducers;
    
-   private final int maxProducerRate;
+   private final int defaultConsumerWindowSize;   
    
-   private final int producerWindowSize;
+   private final int defaultConsumerMaxRate;
    
+   private final int defaultProducerWindowSize;
+   
+   private final int defaultProducerMaxRate;
+     
    private final ExecutorService executor;
    
    private volatile boolean closed;
@@ -149,10 +153,13 @@
    // Constructors ---------------------------------------------------------------------------------
    
    public ClientSessionImpl(final ClientConnectionInternal connection, final String id,
-                            final int lazyAckBatchSize, final boolean cacheProducers,
-                            final int maxProducerRate, final int producerWindowSize,
+                            final int lazyAckBatchSize, final boolean cacheProducers,                            
                             final boolean autoCommitSends, final boolean autoCommitAcks,
-                            final boolean blockOnAcknowledge) throws MessagingException
+                            final boolean blockOnAcknowledge,
+                            final int defaultConsumerWindowSize,  
+                            final int defaultConsumerMaxRate,
+                            final int defaultProducerWindowSize,
+                            final int defaultProducerMaxRate) throws MessagingException
    {
    	if (lazyAckBatchSize < -1 || lazyAckBatchSize == 0)
    	{
@@ -167,10 +174,14 @@
       
       this.cacheProducers = cacheProducers;
       
-      this.maxProducerRate = maxProducerRate;
+      this.defaultConsumerWindowSize = defaultConsumerWindowSize;
       
-      this.producerWindowSize = producerWindowSize;
+      this.defaultConsumerMaxRate = defaultConsumerMaxRate;
       
+      this.defaultProducerWindowSize = defaultProducerWindowSize;
+      
+      this.defaultProducerMaxRate = defaultProducerMaxRate;
+      
       executor = Executors.newSingleThreadExecutor();
       
       this.lazyAckBatchSize = lazyAckBatchSize;
@@ -257,29 +268,23 @@
       checkClosed();
     
       SessionCreateConsumerMessage request =
-         new SessionCreateConsumerMessage(queueName, filterString, noLocal, autoDeleteQueue);
+         new SessionCreateConsumerMessage(queueName, filterString, noLocal, autoDeleteQueue,
+         		                           defaultConsumerWindowSize, defaultConsumerMaxRate);
       
       SessionCreateConsumerResponseMessage response = (SessionCreateConsumerResponseMessage)remotingConnection.send(id, request);
       
-      int prefetchSize = response.getPrefetchSize();
-            
       ClientConsumerInternal consumer =
-         new ClientConsumerImpl(this, response.getConsumerID(),             
-                                executor, remotingConnection, direct, response.getPrefetchSize());
+         new ClientConsumerImpl(this, response.getConsumerID(), executor, remotingConnection, direct, 1);
 
       consumers.put(response.getConsumerID(), consumer);
 
       remotingConnection.getPacketDispatcher().register(new ClientConsumerPacketHandler(consumer, response.getConsumerID()));
+      
+      //Now we send window size tokens to start the consumption
+      //We even send it if windowSize == -1, since we need to start the consumer
+      
+      remotingConnection.send(response.getConsumerID(), new ConsumerFlowTokenMessage(response.getWindowSize()), true);
 
-      if (prefetchSize > 0) 
-      {
-      	//Consumer flow control is enabled so give the server consumer some initial tokens (1.5 * prefetchSize)
-         
-         int initialTokens = prefetchSize + prefetchSize >>> 1;
-         
-         remotingConnection.send(response.getConsumerID(), new ConsumerFlowTokenMessage(initialTokens), true);
-      }
-            
       return consumer;
    }
    
@@ -302,15 +307,13 @@
 
    public ClientProducer createProducer(final String address) throws MessagingException
    {
-      return createProducer(address, producerWindowSize, maxProducerRate);
+      return createProducer(address, defaultProducerWindowSize, defaultProducerMaxRate);
    }
-   
+      
    public ClientProducer createProducer(final String address, final int windowSize, final int maxRate) throws MessagingException
    {
       checkClosed();
       
-      log.info("Creating prod, ws:" + windowSize);
-      
       ClientProducerInternal producer = null;
       
       if (cacheProducers)
@@ -320,14 +323,16 @@
 
       if (producer == null)
       {
-      	SessionCreateProducerMessage request = new SessionCreateProducerMessage(address, windowSize);
+      	SessionCreateProducerMessage request = new SessionCreateProducerMessage(address, windowSize, maxRate);
       	
       	SessionCreateProducerResponseMessage response =
       		(SessionCreateProducerResponseMessage)remotingConnection.send(id, request);
       	
+      	//maxRate and windowSize can be overridden by the server
+      	
       	producer = new ClientProducerImpl(this, response.getProducerID(), address,
       			                            remotingConnection, response.getWindowSize(),
-      			                            maxRate);  
+      			                            response.getMaxRate());  
       	
       	remotingConnection.getPacketDispatcher().register(new ClientProducerPacketHandler(producer, response.getProducerID()));
       }

Modified: trunk/src/main/org/jboss/messaging/core/management/MessagingServerManagement.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessagingServerManagement.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/management/MessagingServerManagement.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -52,7 +52,8 @@
    
    List<Queue> getQueuesForAddress(String address) throws Exception;
 
-   ClientConnectionFactory createClientConnectionFactory(boolean strictTck,int prefetchSize, int producerWindowSize, int producerMaxRate);
+   ClientConnectionFactory createClientConnectionFactory(boolean strictTck,
+   		int consumerWindowSize, int consumerMaxRate, int producerWindowSize, int producerMaxRate);
 
    void removeAllMessagesForAddress(String address) throws Exception;
 

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerManagementImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerManagementImpl.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerManagementImpl.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -150,14 +150,15 @@
       return false;
    }
 
-   public ClientConnectionFactory createClientConnectionFactory(boolean strictTck, int prefetchSize,
+   public ClientConnectionFactory createClientConnectionFactory(boolean strictTck,
+   		                                                       int consumerWindowSize, int consumerMaxRate,
    		                                                       int producerWindowSize, int producerMaxRate)
    {
       return new ClientConnectionFactoryImpl(messagingServer.getConfiguration().getMessagingServerID(),
               messagingServer.getConfiguration(),
               messagingServer.getVersion(),
               messagingServer.getConfiguration().isStrictTck() || strictTck,
-              prefetchSize,
+              consumerWindowSize, consumerMaxRate,
               producerWindowSize, producerMaxRate);
    }
 
@@ -270,7 +271,8 @@
          throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
       }
       Queue queue = binding.getQueue();
-      currentCounters.put(queueName, new MessageCounter(queue.getName(), queue, queue.isDurable(), queue.getQueueSettings().getMatch(queue.getName()).getMessageCounterHistoryDayLimit()));
+      currentCounters.put(queueName, new MessageCounter(queue.getName(), queue, queue.isDurable(),
+      		messagingServer.getQueueSettingsRepository().getMatch(queue.getName()).getMessageCounterHistoryDayLimit()));
    }
 
    public void unregisterMessageCounter(final String queueName) throws Exception
@@ -298,7 +300,8 @@
             throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
          }
          Queue queue = binding.getQueue();
-         messageCounter = new MessageCounter(queue.getName(), queue, queue.isDurable(), queue.getQueueSettings().getMatch(queue.getName()).getMessageCounterHistoryDayLimit());
+         messageCounter = new MessageCounter(queue.getName(), queue, queue.isDurable(),
+         		messagingServer.getQueueSettingsRepository().getMatch(queue.getName()).getMessageCounterHistoryDayLimit());
       }
       currentCounters.put(queueName, messageCounter);
       messageCounter.resetCounter();
@@ -437,7 +440,7 @@
       List<MessageReference> allRefs = getQueue(queue).removeReferences(actFilter);
       for (MessageReference messageReference : allRefs)
       {
-         messageReference.expire(messagingServer.getPersistenceManager());
+         messageReference.expire(messagingServer.getPersistenceManager(), messagingServer.getQueueSettingsRepository());
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/message/MessageReference.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/MessageReference.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/message/MessageReference.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -23,6 +23,8 @@
 
 import org.jboss.messaging.core.persistence.PersistenceManager;
 import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
 
 /**
  * A reference to a message.
@@ -65,9 +67,9 @@
    
    void acknowledge(PersistenceManager persistenceManager) throws Exception;  
    
-   boolean cancel(PersistenceManager persistenceManager) throws Exception;  
+   boolean cancel(PersistenceManager persistenceManager, HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;  
    
-   void expire(PersistenceManager persistenceManager) throws Exception;
+   void expire(PersistenceManager persistenceManager, HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
 }
 
 

Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageReferenceImpl.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageReferenceImpl.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -26,6 +26,8 @@
 import org.jboss.messaging.core.message.MessageReference;
 import org.jboss.messaging.core.persistence.PersistenceManager;
 import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 
 /**
@@ -128,7 +130,8 @@
       queue.referenceAcknowledged();
    }
    
-   public boolean cancel(final PersistenceManager persistenceManager) throws Exception
+   public boolean cancel(final PersistenceManager persistenceManager,
+   		                final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
    {      
       if (message.isDurable() && queue.isDurable())
       {
@@ -137,11 +140,11 @@
               
       queue.referenceCancelled();
 
-      int maxDeliveries = queue.getQueueSettings().getMatch(queue.getName()).getMaxDeliveryAttempts();
+      int maxDeliveries = queueSettingsRepository.getMatch(queue.getName()).getMaxDeliveryAttempts();
       
       if (maxDeliveries > 0 && deliveryCount >= maxDeliveries)
       {      	      	
-         Queue DLQ = queue.getQueueSettings().getMatch(queue.getName()).getDLQ();
+         Queue DLQ = queueSettingsRepository.getMatch(queue.getName()).getDLQ();
          
          if (DLQ != null)
          {
@@ -166,9 +169,10 @@
       }
    }
    
-   public void expire(final PersistenceManager persistenceManager) throws Exception
+   public void expire(final PersistenceManager persistenceManager,
+   		final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
    {
-      Queue expiryQueue = queue.getQueueSettings().getMatch(queue.getName()).getExpiryQueue();
+      Queue expiryQueue = queueSettingsRepository.getMatch(queue.getName()).getExpiryQueue();
       
       if (expiryQueue != null)
       {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -101,7 +101,7 @@
       String targetID = packet.getTargetID();
       if (NO_ID_SET.equals(targetID))
       {
-         log.error("Packet is not handled, it has no targetID: " + packet);
+         log.error("Packet is not handled, it has no targetID: " + packet + ": " + System.identityHashCode(packet));
          return;
       }
       PacketHandler handler = getHandler(targetID);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/CreateConnectionMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/CreateConnectionMessageCodec.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/CreateConnectionMessageCodec.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -6,10 +6,10 @@
  */
 package org.jboss.messaging.core.remoting.impl.codec;
 
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
-
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.CREATECONNECTION;
 
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
+
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
  */
@@ -43,14 +43,12 @@
       String clientVMID = request.getClientVMID();
       String username = request.getUsername();
       String password = request.getPassword();
-      int prefetchSize = request.getPrefetchSize();
 
       int bodyLength = INT_LENGTH // version
             + sizeof(remotingSessionID)
             + sizeof(clientVMID)
             + sizeof(username) 
-            + sizeof(password)
-            + INT_LENGTH;
+            + sizeof(password);
 
       out.putInt(bodyLength);
       out.putInt(version);
@@ -58,7 +56,6 @@
       out.putNullableString(clientVMID);
       out.putNullableString(username);
       out.putNullableString(password);
-      out.putInt(prefetchSize);
    }
 
    @Override
@@ -75,10 +72,8 @@
       String clientVMID = in.getNullableString();
       String username = in.getNullableString();
       String password = in.getNullableString();
-      int prefetchSize = in.getInt();
 
-      return new CreateConnectionRequest(version, remotingSessionID,
-            clientVMID, username, password, prefetchSize);
+      return new CreateConnectionRequest(version, remotingSessionID, clientVMID, username, password);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerMessageCodec.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerMessageCodec.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -40,14 +40,18 @@
       String filterString = request.getFilterString();
       boolean noLocal = request.isNoLocal();
       boolean autoDelete = request.isAutoDeleteQueue();
+      int windowSize = request.getWindowSize();
+      int maxRate = request.getMaxRate();
 
-      int bodyLength = sizeof(queueName) + sizeof(filterString) + 2;
+      int bodyLength = sizeof(queueName) + sizeof(filterString) + 2 + 2 * INT_LENGTH;
 
       out.putInt(bodyLength);
       out.putNullableString(queueName);
       out.putNullableString(filterString);
       out.putBoolean(noLocal);
       out.putBoolean(autoDelete);
+      out.putInt(windowSize);
+      out.putInt(maxRate);
    }
 
    @Override
@@ -64,8 +68,10 @@
       String filterString = in.getNullableString();
       boolean noLocal = in.getBoolean();
       boolean autoDelete = in.getBoolean();
+      int windowSize = in.getInt();
+      int maxRate = in.getInt();
  
-      return new SessionCreateConsumerMessage(queueName, filterString, noLocal, autoDelete);
+      return new SessionCreateConsumerMessage(queueName, filterString, noLocal, autoDelete, windowSize, maxRate);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerResponseMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerResponseMessageCodec.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerResponseMessageCodec.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -38,13 +38,14 @@
          RemotingBuffer out) throws Exception
    {
       String consumerID = response.getConsumerID();
-      int prefetchSize = response.getPrefetchSize();
+      
+      int windowSize = response.getWindowSize();
 
       int bodyLength = sizeof(consumerID) + INT_LENGTH;
        
       out.putInt(bodyLength);
       out.putNullableString(consumerID);
-      out.putInt(prefetchSize);
+      out.putInt(windowSize);
    }
 
    @Override
@@ -58,9 +59,9 @@
       }
 
       String consumerID = in.getNullableString();
-      int prefetchSize = in.getInt();
- 
-      return new SessionCreateConsumerResponseMessage(consumerID, prefetchSize);
+      int windowSize = in.getInt();
+
+      return new SessionCreateConsumerResponseMessage(consumerID, windowSize);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerMessageCodec.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerMessageCodec.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -42,11 +42,12 @@
    {
       String address = request.getAddress();
      
-      int bodyLength = sizeof(address) + INT_LENGTH;
+      int bodyLength = sizeof(address) + 2 * INT_LENGTH;
 
       out.putInt(bodyLength);
       out.putNullableString(address);
       out.putInt(request.getWindowSize());
+      out.putInt(request.getMaxRate());
    }
 
    @Override
@@ -62,8 +63,10 @@
       String address = in.getNullableString();
       
       int windowSize = in.getInt();
+      
+      int maxRate = in.getInt();
 
-      return new SessionCreateProducerMessage(address, windowSize);
+      return new SessionCreateProducerMessage(address, windowSize, maxRate);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerResponseMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerResponseMessageCodec.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerResponseMessageCodec.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -43,11 +43,12 @@
    {
       String producerID = response.getProducerID();
 
-      int bodyLength = sizeof(producerID) + INT_LENGTH;
+      int bodyLength = sizeof(producerID) + 2 * INT_LENGTH;
        
       out.putInt(bodyLength);
       out.putNullableString(producerID);
       out.putInt(response.getWindowSize());
+      out.putInt(response.getMaxRate());
    }
 
    @Override
@@ -62,8 +63,9 @@
 
       String producerID = in.getNullableString();
       int windowSize = in.getInt();
+      int maxRate = in.getInt();
  
-      return new SessionCreateProducerResponseMessage(producerID, windowSize);
+      return new SessionCreateProducerResponseMessage(producerID, windowSize, maxRate);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionRequest.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionRequest.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -27,15 +27,13 @@
    private final String clientVMID;
    private final String username;
    private final String password;
-   private final int prefetchSize;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
    public CreateConnectionRequest(final int version,
-         final String remotingSessionID, final String clientVMID, final String username, final String password,
-         final int prefetchSize)
+         final String remotingSessionID, final String clientVMID, final String username, final String password)
    {
       super(CREATECONNECTION);
 
@@ -47,7 +45,6 @@
       this.clientVMID = clientVMID;
       this.username = username;
       this.password = password;
-      this.prefetchSize = prefetchSize;
    }
 
    // Public --------------------------------------------------------
@@ -90,11 +87,6 @@
       return buf.toString();
    }
 
-   public int getPrefetchSize()
-   {
-      return prefetchSize;
-   }
-
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -25,13 +25,18 @@
    private final boolean noLocal;
    
    private final boolean autoDeleteQueue;
+   
+   private final int windowSize;
+   
+   private int maxRate;
       
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
    public SessionCreateConsumerMessage(final String queueName, final String filterString,
-   		                              final boolean noLocal, final boolean autoDeleteQueue)
+   		                              final boolean noLocal, final boolean autoDeleteQueue,
+   		                              final int windowSize, final int maxRate)
    {
       super(PacketType.SESS_CREATECONSUMER);
 
@@ -39,6 +44,8 @@
       this.filterString = filterString;
       this.noLocal = noLocal;
       this.autoDeleteQueue = autoDeleteQueue;
+      this.windowSize = windowSize;
+      this.maxRate = maxRate;
    }
 
    // Public --------------------------------------------------------
@@ -51,6 +58,8 @@
       buff.append(", filterString=" + filterString);
       buff.append(", noLocal=" + noLocal);
       buff.append(", autoDeleteQueue=" + autoDeleteQueue);
+      buff.append(", windowSize=" + windowSize);
+      buff.append(", maxRate=" + maxRate);
       buff.append("]");
       return buff.toString();
    }
@@ -74,6 +83,16 @@
    {
       return autoDeleteQueue;
    }
+   
+   public int getWindowSize()
+   {
+   	return windowSize;
+   }
+   
+   public int getMaxRate()
+   {
+   	return maxRate;
+   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -24,20 +24,21 @@
 
    private final String consumerID;
    
-   private final int prefetchSize;
-
+   private final int windowSize;
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionCreateConsumerResponseMessage(final String consumerID, final int prefetchSize)
+   public SessionCreateConsumerResponseMessage(final String consumerID, final int windowSize)
    {
       super(SESS_CREATECONSUMER_RESP);
 
       Assert.assertValidID(consumerID);
 
       this.consumerID = consumerID;
-      this.prefetchSize = prefetchSize;
+      
+      this.windowSize = windowSize;
    }
 
    // Public --------------------------------------------------------
@@ -46,10 +47,10 @@
    {
       return consumerID;
    }
-
-   public int getPrefetchSize()
+   
+   public int getWindowSize()
    {
-      return prefetchSize;
+   	return windowSize;
    }
 
    @Override
@@ -57,7 +58,7 @@
    {
       StringBuffer buf = new StringBuffer(getParentString());
       buf.append(", consumerID=" + consumerID);
-      buf.append(", prefetchSize=" + prefetchSize);
+      buf.append(", windowSize=" + windowSize);
       buf.append("]");
       return buf.toString();
    }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -21,18 +21,22 @@
    private final String address;
    
    private final int windowSize;
+   
+   private final int maxRate;
       
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionCreateProducerMessage(final String address, final int windowSize)
+   public SessionCreateProducerMessage(final String address, final int windowSize, final int maxRate)
    {
       super(PacketType.SESS_CREATEPRODUCER);
 
       this.address = address;
       
       this.windowSize = windowSize;
+      
+      this.maxRate = maxRate;
    }
 
    // Public --------------------------------------------------------
@@ -43,6 +47,7 @@
       StringBuffer buff = new StringBuffer(getParentString());
       buff.append(", address=" + address);
       buff.append(", windowSize=" + windowSize);
+      buff.append(", maxrate=" + maxRate);
       buff.append("]");
       return buff.toString();
    }
@@ -56,6 +61,11 @@
    {
    	return windowSize;
    }
+   
+   public int getMaxRate()
+   {
+   	return maxRate;
+   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -23,18 +23,22 @@
    private final String producerID;
    
    private final int windowSize;
+   
+   private final int maxRate;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionCreateProducerResponseMessage(final String producerID, final int windowSize)
+   public SessionCreateProducerResponseMessage(final String producerID, final int windowSize, final int maxRate)
    {
       super(SESS_CREATEPRODUCER_RESP);
 
       this.producerID = producerID;
       
       this.windowSize = windowSize;
+      
+      this.maxRate = maxRate;
    }
 
    // Public --------------------------------------------------------
@@ -48,6 +52,11 @@
    {
    	return windowSize;
    }
+   
+   public int getMaxRate()
+   {
+   	return maxRate;
+   }
 
    @Override
    public String toString()
@@ -55,6 +64,7 @@
       StringBuffer buf = new StringBuffer(getParentString());
       buf.append(", producerID=" + producerID);
       buf.append(", windowSize=" + windowSize);
+      buf.append(", maxRate=" + maxRate);
       buf.append("]");
       return buf.toString();
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -86,7 +86,7 @@
    
    CreateConnectionResponse createConnection(String username, String password,
                                              String remotingClientSessionID, String clientVMID,
-                                             int prefetchSize, String clientAddress) throws Exception;
+                                             String clientAddress) throws Exception;
 
    DeploymentManager getDeploymentManager();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -27,8 +27,6 @@
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.message.MessageReference;
 import org.jboss.messaging.core.postoffice.FlowController;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
 
 
 /**
@@ -108,8 +106,6 @@
    
    int getMessagesAdded();
 
-   HierarchicalRepository<QueueSettings> getQueueSettings();
-   
    FlowController getFlowController();
    
    void setFlowController(FlowController flowController);

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -38,4 +38,6 @@
 	void setStarted(boolean started) throws Exception;
 	
 	void receiveTokens(int tokens) throws Exception;
+	
+	void promptDelivery();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -103,10 +103,10 @@
 
    void deleteQueue(String queueName) throws Exception;
 
-   SessionCreateConsumerResponseMessage  createConsumer(String queueName, String filterString,
-                     boolean noLocal, boolean autoDeleteQueue, int prefetchSize) throws Exception;
+   SessionCreateConsumerResponseMessage createConsumer(String queueName, String filterString, boolean noLocal,
+   		                                              boolean autoDeleteQueue, int windowSize, int maxRate) throws Exception;
    
-   SessionCreateProducerResponseMessage createProducer(String address, int windowSize) throws Exception;   
+   SessionCreateProducerResponseMessage createProducer(String address, int windowSize, int maxRate) throws Exception;   
 
    SessionQueueQueryResponseMessage executeQueueQuery(SessionQueueQueryMessage request) throws Exception;
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -163,7 +163,7 @@
       securityDeployer = new SecurityDeployer(securityRepository);
       queueSettingsRepository.setDefault(new QueueSettings());
       scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize());
-      queueFactory = new QueueFactoryImpl(queueSettingsRepository, scheduledExecutor);
+      queueFactory = new QueueFactoryImpl(scheduledExecutor, queueSettingsRepository);
       connectionManager = new ConnectionManagerImpl();
       memoryManager = new SimpleMemoryManager();
       postOffice = new PostOfficeImpl(configuration.getMessagingServerID(),
@@ -337,7 +337,7 @@
 
    public CreateConnectionResponse createConnection(final String username, final String password,
                                                     final String remotingClientSessionID, final String clientVMID,
-                                                    final int prefetchSize, final String clientAddress)
+                                                    final String clientAddress)
       throws Exception
    {
       log.trace("creating a new connection for user " + username);
@@ -352,7 +352,8 @@
       final ServerConnection connection =
          new ServerConnectionImpl(username, password,
                           remotingClientSessionID, clientVMID, clientAddress,
-                          prefetchSize, remotingService.getDispatcher(), resourceManager, persistenceManager,
+                          remotingService.getDispatcher(), resourceManager, persistenceManager,
+                          queueSettingsRepository,
                           postOffice, securityStore, connectionManager);
 
       remotingService.getDispatcher().register(new ServerConnectionPacketHandler(connection));

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -75,7 +75,7 @@
          
          response = server.createConnection(request.getUsername(), request.getPassword(),
          		                             request.getRemotingSessionID(),
-                                            request.getClientVMID(), request.getPrefetchSize(),
+                                            request.getClientVMID(),
                                             sender.getRemoteAddress());
       }     
       else

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -43,8 +43,8 @@
 
    private final ScheduledExecutorService scheduledExecutor;
 
-   public QueueFactoryImpl(final HierarchicalRepository<QueueSettings> queueSettingsRepository,
-   		                  final ScheduledExecutorService scheduledExecutor)
+   public QueueFactoryImpl(final ScheduledExecutorService scheduledExecutor,
+   		HierarchicalRepository<QueueSettings> queueSettingsRepository)
    {
       this.queueSettingsRepository = queueSettingsRepository;
       
@@ -57,7 +57,7 @@
       QueueSettings queueSettings = queueSettingsRepository.getMatch(name);
             
       Queue queue = new QueueImpl(persistenceID, name, filter, queueSettings.isClustered(), durable, temporary,
-      		queueSettings.getMaxSize(), scheduledExecutor, queueSettingsRepository);
+      		queueSettings.getMaxSize(), scheduledExecutor);
 
       queue.setDistributionPolicy(queueSettings.getDistributionPolicy());
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -42,8 +42,6 @@
 import org.jboss.messaging.core.server.DistributionPolicy;
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
 
 /**
  *
@@ -77,8 +75,6 @@
          
    private final ScheduledExecutorService scheduledExecutor;
 
-   private final HierarchicalRepository<QueueSettings> queueSettings;
-
    private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
 
    private final List<Consumer> consumers  = new ArrayList<Consumer>();
@@ -100,8 +96,8 @@
    private volatile FlowController flowController;
    
    public QueueImpl(final long persistenceID, final String name, final Filter filter, final boolean clustered,
-                    final boolean durable, final boolean temporary, final int maxSize, final ScheduledExecutorService scheduledExecutor,
-                    final HierarchicalRepository<QueueSettings> queueSettings)
+                    final boolean durable, final boolean temporary, final int maxSize,
+                    final ScheduledExecutorService scheduledExecutor)
    {
    	this.persistenceID = persistenceID;
 
@@ -119,8 +115,6 @@
       
       this.scheduledExecutor = scheduledExecutor;
    	
-   	this.queueSettings = queueSettings;
-   	
       direct = true;        	
    }
    
@@ -410,12 +404,7 @@
    {
       return messagesAdded.get();
    }
-   
-   public HierarchicalRepository<QueueSettings> getQueueSettings()
-   {
-      return queueSettings;
-   }
-   
+    
    public void setFlowController(final FlowController flowController)
    {
    	this.flowController = flowController;

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -38,6 +38,8 @@
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerConnection;
 import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.util.ConcurrentHashSet;
 
@@ -74,14 +76,14 @@
    
    private final String clientAddress;
       
-   private final int prefetchSize;
-
    private final PacketDispatcher dispatcher;
    
    private final ResourceManager resourceManager;
    
-   private final PersistenceManager persistenceManager;   
+   private final PersistenceManager persistenceManager;  
    
+   private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+      
    private final PostOffice postOffice;
    
    private final SecurityStore securityStore;
@@ -100,13 +102,14 @@
    // Constructors ---------------------------------------------------------------------------------
       
    public ServerConnectionImpl(final String username, final String password,
-   		                          final String remotingClientSessionID, final String jmsClientVMID,
-   		                          final String clientAddress,
-   		                          final int prefetchSize, final PacketDispatcher dispatcher,
-   		                          final ResourceManager resourceManager,
-   		                          final PersistenceManager persistenceManager,
-   		                          final PostOffice postOffice, final SecurityStore securityStore,
-   		                          final ConnectionManager connectionManager)
+   		                      final String remotingClientSessionID, final String jmsClientVMID,
+   		                      final String clientAddress,
+   		                      final PacketDispatcher dispatcher,
+   		                      final ResourceManager resourceManager,
+   		                      final PersistenceManager persistenceManager,
+   		                      final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+   		                      final PostOffice postOffice, final SecurityStore securityStore,
+   		                      final ConnectionManager connectionManager)
    {
    	id = UUID.randomUUID().toString();
       
@@ -118,14 +121,14 @@
 
       this.clientAddress = clientAddress;
 
-      this.prefetchSize = prefetchSize;
-
       this.dispatcher = dispatcher;
       
       this.resourceManager = resourceManager;
       
       this.persistenceManager = persistenceManager;
       
+      this.queueSettingsRepository = queueSettingsRepository;      
+      
       this.postOffice = postOffice;
       
       this.securityStore = securityStore;
@@ -151,15 +154,15 @@
                                                                final PacketSender sender) throws Exception
    {           
       ServerSession session =
-         new ServerSessionImpl(autoCommitSends, autoCommitAcks, prefetchSize, xa, this, resourceManager,
-         		sender, dispatcher, persistenceManager, postOffice, securityStore);
+         new ServerSessionImpl(autoCommitSends, autoCommitAcks, xa, this, resourceManager,
+         		sender, dispatcher, persistenceManager, queueSettingsRepository, postOffice, securityStore);
 
       synchronized (sessions)
       {
          sessions.put(session.getID(), session);
       }
 
-      dispatcher.register(new ServerSessionPacketHandler(session, prefetchSize));
+      dispatcher.register(new ServerSessionPacketHandler(session));
       
       return new ConnectionCreateSessionResponseMessage(session.getID());
    }
@@ -243,11 +246,6 @@
       temporaryQueues.remove(queue);      
    }
    
-   public int getPrefetchSize()
-   {
-      return prefetchSize;
-   }
-   
    public boolean isStarted()
    {
       return started;

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -30,11 +30,13 @@
 import org.jboss.messaging.core.message.MessageReference;
 import org.jboss.messaging.core.persistence.PersistenceManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.remoting.PacketHandler;
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerConsumer;
 import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.util.TokenBucketLimiter;
 
 /**
  * Concrete implementation of a ClientConsumer. 
@@ -71,7 +73,7 @@
    
    private final boolean autoDeleteQueue;
    
-   private final boolean enableFlowControl;
+   private final TokenBucketLimiter limiter;
    
    private final String connectionID;   
    
@@ -79,20 +81,24 @@
 
    private final PersistenceManager persistenceManager;
    
+   private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+   
    private final PostOffice postOffice;
          
    private final Object startStopLock = new Object();
 
-   private final AtomicInteger availableTokens = new AtomicInteger(0);
+   private final AtomicInteger availableTokens;
    
    private boolean started;
 
    // Constructors ---------------------------------------------------------------------------------
 
    ServerConsumerImpl(final Queue messageQueue, final boolean noLocal, final Filter filter,
-   		             final boolean autoDeleteQueue, final boolean enableFlowControl,
+   		             final boolean autoDeleteQueue, final boolean enableFlowControl, final int maxRate,
    		             final String connectionID, final ServerSession sessionEndpoint,
-					       final PersistenceManager persistenceManager, final PostOffice postOffice,
+					       final PersistenceManager persistenceManager,
+					       final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+					       final PostOffice postOffice,
 					       final boolean started)
    {
    	id = UUID.randomUUID().toString();
@@ -105,21 +111,37 @@
       
       this.autoDeleteQueue = autoDeleteQueue;
       
-      this.enableFlowControl = enableFlowControl;
-      
+      if (maxRate != -1)
+      {
+      	limiter = new TokenBucketLimiter(maxRate, false);
+      }
+      else
+      {
+      	limiter = null;
+      }
+
       this.connectionID = connectionID;
 
       this.sessionEndpoint = sessionEndpoint;
 
       this.persistenceManager = persistenceManager;
       
+      this.queueSettingsRepository = queueSettingsRepository;
+      
       this.postOffice = postOffice;
       
       this.started = started;
       
+      if (enableFlowControl)
+      {
+         availableTokens = new AtomicInteger(0);
+      }
+      else
+      {
+      	availableTokens = null;
+      }
+      
       messageQueue.addConsumer(this);
-      
-      messageQueue.deliver();
    }
 
    // ServerConsumer implementation ----------------------------------------------------------------------
@@ -131,14 +153,14 @@
    
    public HandleStatus handle(MessageReference ref) throws Exception
    {
-      if (enableFlowControl && availableTokens.get() == 0)
+      if (availableTokens != null && availableTokens.get() == 0)
       {
          return HandleStatus.BUSY;
       }
 
       if (ref.getMessage().isExpired())
       {         
-         ref.expire(persistenceManager);
+         ref.expire(persistenceManager, queueSettingsRepository);
          
          return HandleStatus.HANDLED;
       }
@@ -171,7 +193,7 @@
             }            
          }
                          
-         if (enableFlowControl)
+         if (availableTokens != null)
          {
             availableTokens.decrementAndGet();
          }
@@ -238,10 +260,18 @@
    
    public void receiveTokens(final int tokens) throws Exception
    {
-      availableTokens.addAndGet(tokens);
+      int previous = availableTokens != null ? availableTokens.getAndAdd(tokens) : 0;
 
-      promptDelivery();      
+      if (previous == 0)
+      {
+      	promptDelivery();      
+      }   	
    }
+   
+   public void promptDelivery()
+   {
+      sessionEndpoint.promptDelivery(messageQueue);
+   } 
 
    // Public -----------------------------------------------------------------------------
      
@@ -252,8 +282,4 @@
    
    // Private --------------------------------------------------------------------------------------
 
-   private void promptDelivery()
-   {
-      sessionEndpoint.promptDelivery(messageQueue);
-   } 
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -64,6 +64,8 @@
 import org.jboss.messaging.core.server.ServerConsumer;
 import org.jboss.messaging.core.server.ServerProducer;
 import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
@@ -113,6 +115,8 @@
    
    private final PersistenceManager persistenceManager;
    
+   private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+         
    private final PostOffice postOffice;
    
    private final SecurityStore securityStore;
@@ -135,10 +139,11 @@
    // ---------------------------------------------------------------------------------
 
    public ServerSessionImpl(final boolean autoCommitSends,
-                            final boolean autoCommitAcks, final int prefetchSize,
+                            final boolean autoCommitAcks,
                             final boolean xa, final ServerConnection connection,
                             final ResourceManager resourceManager, final PacketSender sender, 
                             final PacketDispatcher dispatcher, final PersistenceManager persistenceManager,
+                            final HierarchicalRepository<QueueSettings> queueSettingsRepository,
                             final PostOffice postOffice, final SecurityStore securityStore) throws Exception
    {
    	id = UUID.randomUUID().toString();
@@ -162,6 +167,8 @@
       
       this.persistenceManager = persistenceManager;
       
+      this.queueSettingsRepository = queueSettingsRepository;
+      
       this.postOffice = postOffice;
       
       this.securityStore = securityStore;
@@ -443,7 +450,7 @@
          deliveryIDSequence -= tx.getAcknowledgementsCount();
       }
 
-      tx.rollback(persistenceManager);
+      tx.rollback(persistenceManager, queueSettingsRepository);
    }
 
    public void cancel(final long deliveryID, final boolean expired) throws Exception
@@ -466,7 +473,7 @@
             deliveries.clear();
          }
 
-         cancelTx.rollback(persistenceManager);
+         cancelTx.rollback(persistenceManager, queueSettingsRepository);
       }
       else if (expired)
       {
@@ -483,7 +490,7 @@
 
             if (delivery.getDeliveryID() == deliveryID)
             {
-               delivery.getReference().expire(persistenceManager);
+               delivery.getReference().expire(persistenceManager, queueSettingsRepository);
 
                iter.remove();
 
@@ -707,7 +714,7 @@
             XAException.XAER_PROTO,
             "Cannot rollback transaction, it is suspended " + xid); }
 
-      theTx.rollback(persistenceManager);
+      theTx.rollback(persistenceManager, queueSettingsRepository);
 
       boolean removed = resourceManager.removeTransaction(xid);
 
@@ -875,9 +882,9 @@
       }
    }
 
-   public SessionCreateConsumerResponseMessage
-      createConsumer(final String queueName, final String filterString,
-                     final boolean noLocal, final boolean autoDeleteQueue, final int prefetchSize) throws Exception
+   public SessionCreateConsumerResponseMessage createConsumer(final String queueName, final String filterString,
+                                                              final boolean noLocal, final boolean autoDeleteQueue,
+                                                              int windowSize, int maxRate) throws Exception
    {
       Binding binding = postOffice.getBinding(queueName);
 
@@ -894,20 +901,28 @@
       {
          filter = new FilterImpl(filterString);
       }
-
+      
+      //Flow control values if specified on queue override those passed in from client
+      
+      Integer queueWindowSize = queueSettingsRepository.getMatch(queueName).getConsumerWindowSize();
+      
+      windowSize = queueWindowSize != null ? queueWindowSize : windowSize;
+      
+      Integer queueMaxRate = queueSettingsRepository.getMatch(queueName).getConsumerMaxRate();
+      
+      maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
+      
       ServerConsumer consumer =
-      	new ServerConsumerImpl(binding.getQueue(), noLocal, filter, autoDeleteQueue, prefetchSize > 0, connection.getID(),
-                                    this, persistenceManager, postOffice, connection.isStarted());
+      	new ServerConsumerImpl(binding.getQueue(), noLocal, filter, autoDeleteQueue, windowSize != -1, maxRate, connection.getID(),
+                                this, persistenceManager, queueSettingsRepository, postOffice, connection.isStarted());
 
       dispatcher.register(new ServerConsumerPacketHandler(consumer));
 
-      SessionCreateConsumerResponseMessage response = new SessionCreateConsumerResponseMessage(consumer.getID(),
-            prefetchSize);
+      SessionCreateConsumerResponseMessage response =
+      	new SessionCreateConsumerResponseMessage(consumer.getID(), windowSize);
 
       consumers.put(consumer.getID(), consumer);      
-
-      log.trace(this + " created and registered " + consumer);
-
+      
       return response;
    }
 
@@ -1007,12 +1022,16 @@
     * @param windowSize - the producer window size to use for flow control.
     * Specify -1 to disable flow control completely
     * The actual window size used may be less than the specified window size if the queue's maxSize attribute
-    * is set and there are not sufficient empty spaces in the queue
+    * is set and there are not sufficient empty spaces in the queue, or it is overridden by any producer-window_size
+    * specified on the queue
     */
-   public SessionCreateProducerResponseMessage createProducer(final String address, final int windowSize) throws Exception
+   public SessionCreateProducerResponseMessage createProducer(final String address, final int windowSize,
+   		                                                     final int maxRate) throws Exception
    { 	
    	FlowController flowController = null;
    	
+   	final int maxRateToUse = maxRate;
+   	
    	if (address != null)
    	{
    		flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
@@ -1024,9 +1043,9 @@
    	
    	dispatcher.register(new ServerProducerPacketHandler(producer));
    	
-   	int windowToUse = flowController == null ? -1 : flowController.getInitialTokens(windowSize, producer);
+   	final int windowToUse = flowController == null ? -1 : flowController.getInitialTokens(windowSize, producer);
    	   	   	
-   	return new SessionCreateProducerResponseMessage(producer.getID(), windowToUse);
+   	return new SessionCreateProducerResponseMessage(producer.getID(), windowToUse, maxRateToUse);
    }
    
    // Public ---------------------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -67,13 +67,9 @@
 {
 	private final ServerSession session;
 	
-	private final int prefetchSize;
-	
-	public ServerSessionPacketHandler(final ServerSession session, final int prefetchSize)
+	public ServerSessionPacketHandler(final ServerSession session)
    {
 		this.session = session;
-		
-		this.prefetchSize = prefetchSize;
    }
 
    public String getID()
@@ -91,8 +87,10 @@
       case SESS_CREATECONSUMER:
       {
          SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
-         response = session.createConsumer(request.getQueueName(), request
-               .getFilterString(), request.isNoLocal(), request.isAutoDeleteQueue(), prefetchSize);
+         
+         response = session.createConsumer(request.getQueueName(), request.getFilterString(),
+         		                            request.isNoLocal(), request.isAutoDeleteQueue(),
+         		                            request.getWindowSize(), request.getMaxRate());
          break;
       }
       case SESS_CREATEQUEUE:
@@ -131,7 +129,7 @@
       case SESS_CREATEPRODUCER:
       {
          SessionCreateProducerMessage request = (SessionCreateProducerMessage) packet;
-         response = session.createProducer(request.getAddress(), request.getWindowSize());
+         response = session.createProducer(request.getAddress(), request.getWindowSize(), request.getMaxRate());
          break;
       }
       case CLOSE:

Modified: trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -44,6 +44,7 @@
    public static final Integer DEFAULT_MAX_DELIVERY_ATTEMPTS = 10;
    public static final Integer DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT = 0;
    public static final Long DEFAULT_REDELIVER_DELAY = (long) 500;
+   
 
    private Boolean clustered = false;
    private Integer maxSize = null;
@@ -53,6 +54,10 @@
    private Long redeliveryDelay = null;
    private Queue DLQ = null;
    private Queue ExpiryQueue = null;
+   private Integer consumerWindowSize = null;
+   private Integer consumerMaxRate = null;
+   private Integer producerWindowSize = null;
+   private Integer producerMaxRate = null;
 
 
    public Boolean isClustered()
@@ -115,7 +120,6 @@
       this.distributionPolicyClass = distributionPolicyClass;
    }
 
-
    public Queue getDLQ()
    {
       return DLQ;
@@ -152,7 +156,45 @@
       return DEFAULT_DISTRIBUTION_POLICY;
    }
 
+   public Integer getConsumerWindowSize()
+	{
+		return consumerWindowSize;
+	}
 
+	public void setConsumerWindowSize(Integer consumerWindowSize)
+	{
+		this.consumerWindowSize = consumerWindowSize;
+	}
+
+	public Integer getConsumerMaxRate()
+	{
+		return consumerMaxRate;
+	}
+
+	public void setConsumerMaxRate(Integer consumerMaxRate)
+	{
+		this.consumerMaxRate = consumerMaxRate;
+	}
+
+	public Integer getProducerWindowSize()
+	{
+		return producerWindowSize;
+	}
+
+	public void setProducerWindowSize(Integer producerWindowSize)
+	{
+		this.producerWindowSize = producerWindowSize;
+	}
+
+	public Integer getProducerMaxRate()
+	{
+		return producerMaxRate;
+	}
+
+	public void setProducerMaxRate(Integer producerMaxRate)
+	{
+		this.producerMaxRate = producerMaxRate;
+	}
    
 
    /**
@@ -193,5 +235,23 @@
       {
          ExpiryQueue = merged.ExpiryQueue;
       }
+      if (merged.consumerWindowSize != null)
+      {
+      	consumerWindowSize = merged.consumerWindowSize;
+      }
+      if (merged.consumerMaxRate != null)
+      {
+      	consumerMaxRate = merged.consumerMaxRate;
+      }
+      if (merged.producerWindowSize != null)
+      {
+      	producerWindowSize = merged.producerWindowSize;
+      }
+      if (merged.producerMaxRate != null)
+      {
+      	producerMaxRate = merged.producerMaxRate;
+      }
    }
+
+	
 }

Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -26,6 +26,8 @@
 import org.jboss.messaging.core.message.Message;
 import org.jboss.messaging.core.message.MessageReference;
 import org.jboss.messaging.core.persistence.PersistenceManager;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
 
 /**
  * 
@@ -42,7 +44,8 @@
    
    void commit(boolean onePhase, PersistenceManager persistenceManager) throws Exception;
    
-   void rollback(PersistenceManager persistenceManager) throws Exception;   
+   void rollback(PersistenceManager persistenceManager,
+   		        final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;   
    
    void addMessage(Message message);
    

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -34,6 +34,8 @@
 import org.jboss.messaging.core.message.MessageReference;
 import org.jboss.messaging.core.persistence.PersistenceManager;
 import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.TransactionSynchronization;
 
@@ -153,7 +155,8 @@
       clear();      
    }
    
-   public void rollback(final PersistenceManager persistenceManager) throws Exception
+   public void rollback(final PersistenceManager persistenceManager,
+   		               final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
    {
       callSynchronizations(SyncType.BEFORE_ROLLBACK);
         
@@ -162,7 +165,7 @@
          persistenceManager.unprepareTransaction(xid, messagesToAdd, acknowledgements);             
       }
       
-      cancelDeliveries(persistenceManager);
+      cancelDeliveries(persistenceManager, queueSettingsRepository);
                         
       callSynchronizations(SyncType.AFTER_ROLLBACK);  
       
@@ -235,7 +238,8 @@
       containsPersistent = false;
    }
    
-   private void cancelDeliveries(final PersistenceManager persistenceManager) throws Exception
+   private void cancelDeliveries(final PersistenceManager persistenceManager,
+   		                        final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
    {
       Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
       
@@ -255,7 +259,7 @@
             queueMap.put(queue, list);
          }
                  
-         if (ref.cancel(persistenceManager))
+         if (ref.cancel(persistenceManager, queueSettingsRepository))
          {
             list.add(ref);
          }

Modified: trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -36,12 +36,14 @@
    Set<String> listTemporaryDestinations();
 
    boolean createConnectionFactory(String name, String clientID,
-   		                          int dupsOKBatchSize, boolean strictTck, int prefetchSize,
+   		                          int dupsOKBatchSize, boolean strictTck,
+   		                          int consumerWindowSize, int consumerMaxRate,
    		                          int producerWindowSize, int producerMaxRate,
    		                          String jndiBinding) throws Exception;
 
    boolean createConnectionFactory(String name, String clientID, int dupsOKBatchSize,
-   		                          boolean strictTck, int prefetchSize,
+   		                          boolean strictTck,
+   		                          int consumerWindowSize, int consumerMaxRate,
    		                          int producerWindowSize, int producerMaxRate,
    		                          List<String> jndiBindings) throws Exception;
 

Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -22,8 +22,6 @@
 package org.jboss.messaging.jms.server.impl;
 
 import org.jboss.logging.Logger;
-import org.jboss.messaging.core.deployers.Deployer;
-import org.jboss.messaging.core.deployers.DeploymentManager;
 import org.jboss.messaging.core.deployers.impl.XmlDeployer;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.jms.server.JMSServerManager;
@@ -43,7 +41,8 @@
 
    private static final String CLIENTID_ELEMENT = "client-id";
    private static final String DUPS_OK_BATCH_SIZE_ELEMENT = "dups-ok-batch-size";
-   private static final String PREFETCH_SIZE_ELEMENT = "prefetch-size";
+   private static final String CONSUMER_WINDOW_SIZE_ELEMENT = "consumer-window-size";
+   private static final String CONSUMER_MAX_RATE = "consumer-max-rate";
    private static final String PRODUCER_WINDOW_SIZE = "producer-window-size";
    private static final String PRODUCER_MAX_RATE = "producer-max-rate";
    private static final String SUPPORTS_FAILOVER = "supports-failover";
@@ -123,21 +122,29 @@
          // See http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4076040#4076040
          NodeList attributes = node.getChildNodes();
          boolean cfStrictTck = false;
-         int prefetchSize = 150;
+         
          String clientID = null;
          int dupsOKBatchSize = 1000;
+         
+         int consumerWindowSize = 1000;
+         int consumerMaxRate = -1;         
          int producerWindowSize = 1000;
          int producerMaxRate = -1;
+         
          for (int j = 0; j < attributes.getLength(); j++)
          {
             if (STRICT_TCK.equalsIgnoreCase(attributes.item(j).getNodeName()))
             {
                cfStrictTck = Boolean.parseBoolean(attributes.item(j).getTextContent().trim());
             }
-            else if (PREFETCH_SIZE_ELEMENT.equalsIgnoreCase(attributes.item(j).getNodeName()))
+            else if (CONSUMER_WINDOW_SIZE_ELEMENT.equalsIgnoreCase(attributes.item(j).getNodeName()))
             {
-               prefetchSize = Integer.parseInt(attributes.item(j).getTextContent().trim());
+               consumerWindowSize = Integer.parseInt(attributes.item(j).getTextContent().trim());
             }
+            else if (CONSUMER_MAX_RATE.equalsIgnoreCase(attributes.item(j).getNodeName()))
+            {
+               consumerMaxRate = Integer.parseInt(attributes.item(j).getTextContent().trim());
+            }
             else if (PRODUCER_WINDOW_SIZE.equalsIgnoreCase(attributes.item(j).getNodeName()))
             {
                producerWindowSize = Integer.parseInt(attributes.item(j).getTextContent().trim());
@@ -181,7 +188,7 @@
                String jndiName = child.getAttributes().getNamedItem("name").getNodeValue();
                String name = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
                jmsServerManager.createConnectionFactory(name, clientID, dupsOKBatchSize, cfStrictTck,
-               		prefetchSize, producerWindowSize, producerMaxRate, jndiName);
+               		consumerWindowSize, consumerMaxRate, producerWindowSize, producerMaxRate, jndiName);
             }
          }
       }

Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -242,16 +242,15 @@
    }
 
    public boolean createConnectionFactory(String name, String clientID,
-   		int dupsOKBatchSize, boolean strictTck, int prefetchSize,
+   		int dupsOKBatchSize, boolean strictTck, int consumerWindowSize, int consumerMaxRate,
    		int producerWindowSize, int producerMaxRate, String jndiBinding) throws Exception
    {
       JBossConnectionFactory cf = connectionFactories.get(name);
       if (cf == null)
       {
-      	log.info("^^^ creating cf with qws:" + producerWindowSize);
-      	
          ClientConnectionFactory clientConnectionFactory =
-         	messagingServerManagement.createClientConnectionFactory(strictTck, prefetchSize, producerWindowSize, producerMaxRate);
+         	messagingServerManagement.createClientConnectionFactory(strictTck,
+         			consumerWindowSize, consumerMaxRate, producerWindowSize, producerMaxRate);
          log.debug(this + " created local connectionFactory " + clientConnectionFactory);
          cf = new JBossConnectionFactory(clientConnectionFactory, clientID, dupsOKBatchSize);
       }
@@ -269,7 +268,7 @@
 
 
    public boolean createConnectionFactory(String name, String clientID, int dupsOKBatchSize,
-   		                                 boolean strictTck, int prefetchSize,
+   		                                 boolean strictTck, int consumerWindowSize, int consumerMaxRate,
    		                                 int producerWindowSize, int producerMaxRate,
    		                                 List<String> jndiBindings) throws Exception
    {
@@ -277,7 +276,8 @@
       if (cf == null)
       {
          ClientConnectionFactory clientConnectionFactory =
-         	messagingServerManagement.createClientConnectionFactory(strictTck, prefetchSize, producerWindowSize, producerMaxRate);
+         	messagingServerManagement.createClientConnectionFactory(strictTck,
+         			consumerWindowSize, consumerMaxRate, producerWindowSize, producerMaxRate);
          log.debug(this + " created local connectionFactory " + clientConnectionFactory);
          cf = new JBossConnectionFactory(clientConnectionFactory, clientID, dupsOKBatchSize);
       }

Modified: trunk/src/main/org/jboss/messaging/util/TokenBucketLimiter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/TokenBucketLimiter.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/util/TokenBucketLimiter.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -90,7 +90,7 @@
 			tokensAdded = 0;
 		}
 														
-		int tokensDue = (int)(rate * (diff)  / 1000);
+		int tokensDue = (int)(rate * diff  / 1000);
 		
 		int tokensToAdd = tokensDue - tokensAdded;
 		

Added: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -0,0 +1,232 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.journal.impl.test.unit.fakes.FakeSequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.test.unit.fakes.FakeSequentialFileFactory.FakeSequentialFile;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.test.unit.UnitTestCase;
+
+/**
+ * 
+ * A JournalTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class JournalTest extends UnitTestCase
+{
+	private static final Logger log = Logger.getLogger(JournalTest.class);
+	
+	private String journalDir = System.getProperty("user.home") + "/journal-test";
+	
+	private FakeSequentialFileFactory factory = new FakeSequentialFileFactory();
+	
+	protected void setUp() throws Exception
+	{
+		super.setUp();
+		
+		File file = new File(journalDir);
+		
+		deleteDirectory(file);
+		
+		file.mkdir();		
+	}
+	
+	public void testLoad() throws Exception
+	{
+		final int numFiles = 10;
+		
+		final int fileSize = 10 * 1024;
+		
+		final boolean sync = true;
+		
+		long timeStart = System.currentTimeMillis();
+		
+		JournalImpl journal = new JournalImpl(journalDir, fileSize, numFiles, sync, factory);
+		
+		journal.load();
+		
+		long timeEnd = System.currentTimeMillis();
+		
+		assertEquals(1, journal.getFiles().size());
+		assertEquals(numFiles - 1, journal.getAvailableFiles().size());
+		assertEquals(0, journal.getFilesToDelete().size());
+		
+		assertEquals(numFiles, factory.getFileMap().size());
+		
+		for (Map.Entry<String, FakeSequentialFile> entry: factory.getFileMap().entrySet())
+		{
+			FakeSequentialFile file = (FakeSequentialFile)entry.getValue();
+			
+			assertEquals(sync, file.isSync());
+			
+			assertTrue(file.isOpen());
+			
+			byte[] bytes = file.getData().array();
+			
+			assertEquals(fileSize, bytes.length);
+									
+			//First four bytes should be ordering id timestamp
+			
+			ByteBuffer bb = ByteBuffer.wrap(bytes, 0, 8);
+			long orderingID = bb.getLong();
+			
+			String expectedFilename =
+				journalDir + "/" + JournalImpl.JOURNAL_FILE_PREFIX + "-" + orderingID + "." + JournalImpl.JOURNAL_FILE_EXTENSION;
+			
+			assertEquals(expectedFilename, file.getFileName());
+			
+			log.info("Ordering id is " + orderingID);
+			
+			assertTrue(orderingID >= timeStart);
+			
+			assertTrue(orderingID <= timeEnd);
+			
+			for (int i = 8; i < bytes.length; i++)
+			{
+				if (bytes[i] != JournalImpl.FILL_CHARACTER)
+				{
+					fail("Not filled correctly");
+				}
+			}
+		}
+		
+		journal.stop();
+		
+		for (Map.Entry<String, FakeSequentialFile> entry: factory.getFileMap().entrySet())
+		{
+			FakeSequentialFile file = (FakeSequentialFile)entry.getValue();
+			
+			assertFalse(file.isOpen());
+		}
+		
+		assertEquals(0, journal.getFiles().size());
+		assertEquals(0, journal.getAvailableFiles().size());
+		assertEquals(0, journal.getFilesToDelete().size());
+		
+		//Now reload
+		
+		journal = new JournalImpl(journalDir, fileSize, numFiles, sync, factory);
+		
+		log.info("******** reloading");
+		
+		journal.load();
+		
+		assertEquals(1, journal.getFiles().size());
+		assertEquals(numFiles - 1, journal.getAvailableFiles().size());
+		assertEquals(0, journal.getFilesToDelete().size());
+		
+		assertEquals(numFiles, factory.getFileMap().size());
+		
+		for (Map.Entry<String, FakeSequentialFile> entry: factory.getFileMap().entrySet())
+		{
+			FakeSequentialFile file = (FakeSequentialFile)entry.getValue();
+			
+			assertEquals(sync, file.isSync());
+			
+			assertTrue(file.isOpen());
+			
+			byte[] bytes = file.getData().array();
+			
+			assertEquals(fileSize, bytes.length);
+									
+			//First four bytes should be ordering id timestamp
+			
+			ByteBuffer bb = ByteBuffer.wrap(bytes, 0, 8);
+			long orderingID = bb.getLong();
+			
+			String expectedFilename =
+				journalDir + "/" + JournalImpl.JOURNAL_FILE_PREFIX + "-" + orderingID + "." + JournalImpl.JOURNAL_FILE_EXTENSION;
+			
+			assertEquals(expectedFilename, file.getFileName());
+			
+			log.info("Ordering id is " + orderingID);
+			
+			assertTrue(orderingID >= timeStart);
+			
+			assertTrue(orderingID <= timeEnd);
+			
+			for (int i = 8; i < bytes.length; i++)
+			{
+				if (bytes[i] != JournalImpl.FILL_CHARACTER)
+				{
+					fail("Not filled correctly");
+				}
+			}
+		}
+		
+		
+	}
+
+//	public void test1() throws Exception
+//	{		
+//		File file = new File(journalDir);
+//		
+//		JournalImpl journal = new JournalImpl(journalDir, 10 * 1024 * 1024, 10, true, factory);
+//		
+//		journal.load();
+//		
+//		long start = System.currentTimeMillis();
+//		
+//		byte[] bytes = new byte[1024];
+//
+//		for (int i = 0; i < bytes.length; i++)
+//		{
+//			if (i % 100 == 0)
+//			{
+//				bytes[i] = '\n';
+//			}
+//			else
+//			{
+//				bytes[i] = 'T';
+//			}
+//		}
+//		
+//		final int numIts = 50000;
+//		
+//		for (int i = 0; i < numIts; i++)
+//		{
+//			journal.add(1, bytes);
+//		}
+//				
+//		long end = System.currentTimeMillis();
+//		
+//		long numbytes = numIts * 1024;
+//		
+//		double actualRate = 1000 * (double)numbytes / ( end - start);
+//      
+//      log.info("Rate: (bytes/sec) " + actualRate);
+//      
+//      double recordRate = 1000 * (double)numIts / ( end - start);
+//      
+//      log.info("Rate: (records/sec) " + recordRate);
+//		
+//	}
+
+}

Added: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -0,0 +1,217 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.journal.impl.test.unit.fakes;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * 
+ * A FakeSequentialFileFactory
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class FakeSequentialFileFactory implements SequentialFileFactory
+{
+	private static final Logger log = Logger.getLogger(FakeSequentialFileFactory.class);
+		
+	private Map<String, FakeSequentialFile> fileMap = new ConcurrentHashMap<String, FakeSequentialFile>();
+	
+	public SequentialFile createSequentialFile(final String fileName, final boolean sync) throws Exception
+	{
+		FakeSequentialFile sf = fileMap.get(fileName);
+		
+		if (sf == null)
+		{						
+		   sf = new FakeSequentialFile(fileName, sync);
+		   
+		   fileMap.put(fileName, sf);
+		}
+		else
+		{		
+			sf.data.position(0);
+			
+			log.info("positioning data to 0");
+		}
+						
+		return sf;
+	}
+	
+	public List<String> listFiles(String journalDir, String extension)
+	{
+		return new ArrayList<String>(fileMap.keySet());
+	}
+	
+	public Map<String, FakeSequentialFile> getFileMap()
+	{
+		return fileMap;
+	}
+	
+	public void clear()
+	{
+		fileMap.clear();
+	}
+	
+	public class FakeSequentialFile implements SequentialFile
+	{
+		private volatile boolean open;
+		
+		private final String fileName;
+		
+		private final boolean sync;
+		
+		private ByteBuffer data;
+		
+		public ByteBuffer getData()
+		{
+			return data;
+		}
+		
+		public boolean isSync()
+		{
+			return sync;
+		}
+		
+		public boolean isOpen()
+		{
+			log.info("is open" + System.identityHashCode(this) +" open is now " + open);
+			return open;
+		}
+		
+		public FakeSequentialFile(final String fileName, final boolean sync)
+		{
+			this.fileName = fileName;
+			
+			this.sync = sync;		
+		}
+
+		public void close() throws Exception
+		{
+			open = false;
+			
+			log.info("Calling close " + System.identityHashCode(this) +" open is now " + open);
+		}
+
+		public void delete() throws Exception
+		{
+			if (!open)
+			{
+				throw new IllegalStateException("Is closed");
+			}
+			close();
+			
+			fileMap.remove(fileName);
+		}
+
+		public String getFileName()
+		{
+			if (!open)
+			{
+				throw new IllegalStateException("Is closed");
+			}
+			return fileName;
+		}
+
+		public void open() throws Exception
+		{
+			log.info("open called");
+			
+			if (open)
+			{
+				throw new IllegalStateException("Is already open");
+			}		
+
+			open = true;
+		}
+
+		public void preAllocate(int size, byte fillCharacter) throws Exception
+		{		
+			if (!open)
+			{
+				throw new IllegalStateException("Is closed");
+			}
+			
+			log.info("pre-allocate called " + size +" , " + fillCharacter);
+			
+			byte[] bytes = new byte[size];
+			
+			for (int i = 0; i < size; i++)
+			{
+				bytes[i] = fillCharacter;
+			}
+			
+			data = ByteBuffer.wrap(bytes);		
+		}
+
+		public void read(ByteBuffer bytes) throws Exception
+		{
+			if (!open)
+			{
+				throw new IllegalStateException("Is closed");
+			}
+			
+			log.info("read called " + bytes.array().length);
+			
+			byte[] bytesRead = new byte[bytes.array().length];
+			
+			log.info("reading, data pos is " + data.position() + " data size is " + data.array().length);
+			
+			data.get(bytesRead);
+			
+			bytes.put(bytesRead);
+		}
+
+		public void reset() throws Exception
+		{
+			if (!open)
+			{
+				throw new IllegalStateException("Is closed");
+			}
+			
+			log.info("reset called");
+			
+			data.position(0);
+		}
+
+		public void write(ByteBuffer bytes) throws Exception
+		{
+			if (!open)
+			{
+				throw new IllegalStateException("Is closed");
+			}
+			
+			log.info("write called, position is " + data.position() + " bytes is " + bytes.array().length);
+			
+			data.put(bytes);
+		}
+
+	}
+
+}

Modified: trunk/tests/src/org/jboss/messaging/core/persistence/impl/bdbje/test/unit/BDBJEPersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/persistence/impl/bdbje/test/unit/BDBJEPersistenceManagerTest.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/messaging/core/persistence/impl/bdbje/test/unit/BDBJEPersistenceManagerTest.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -85,7 +85,7 @@
    
    private Queue createQueue(int i)
    {
-      return new QueueImpl(i, "blah" + i, null, false, true, false, -1, null, null);
+      return new QueueImpl(i, "blah" + i, null, false, true, false, -1, null);
    }
    
    // The tests ----------------------------------------------------------------
@@ -305,16 +305,16 @@
    {      
       Message msg = generateMessage(1);
       
-      Queue queue1 = new QueueImpl(1, "queue1", null, false, true, false, -1, null, null);
+      Queue queue1 = new QueueImpl(1, "queue1", null, false, true, false, -1, null);
       assertTrue(queue1.isDurable());
       
-      Queue queue2 = new QueueImpl(1, "queue1", null, false, false, false, -1, null, null);
+      Queue queue2 = new QueueImpl(1, "queue1", null, false, false, false, -1, null);
       assertFalse(queue2.isDurable());
       
-      Queue queue3 = new QueueImpl(1, "queue1", null, false, true, false, -1, null, null);
+      Queue queue3 = new QueueImpl(1, "queue1", null, false, true, false, -1, null);
       assertTrue(queue3.isDurable());
       
-      Queue queue4 = new QueueImpl(1, "queue1", null, false, false, false, -1, null, null);
+      Queue queue4 = new QueueImpl(1, "queue1", null, false, false, false, -1, null);
       assertFalse(queue4.isDurable());
      
       MessageReference ref1 = msg.createReference(queue1);
@@ -662,13 +662,13 @@
        
    public void testAddRemoveBindings() throws Exception
    {
-      Queue queue1 = new QueueImpl(1, "queue1", new FilterImpl("a=1"), false, true, false, -1, null, null);
+      Queue queue1 = new QueueImpl(1, "queue1", new FilterImpl("a=1"), false, true, false, -1, null);
       
-      Queue queue2 = new QueueImpl(2, "queue2", new FilterImpl("a=1"), false, true, false, -1, null, null);
+      Queue queue2 = new QueueImpl(2, "queue2", new FilterImpl("a=1"), false, true, false, -1, null);
             
-      Queue queue3 = new QueueImpl(3, "queue3", new FilterImpl("a=1"), false, true, false, -1, null, null);
+      Queue queue3 = new QueueImpl(3, "queue3", new FilterImpl("a=1"), false, true, false, -1, null);
       
-      Queue queue4 = new QueueImpl(4, "queue4", new FilterImpl("a=1"), false, true, false, -1, null, null);
+      Queue queue4 = new QueueImpl(4, "queue4", new FilterImpl("a=1"), false, true, false, -1, null);
       
       String condition1 = "queue.condition1";
       
@@ -751,13 +751,13 @@
    
    public void testLoadBindings() throws Exception
    {
-      Queue queue1 = new QueueImpl(1, "queue1", null, false, true, false, -1, null, null);
+      Queue queue1 = new QueueImpl(1, "queue1", null, false, true, false, -1, null);
       
-      Queue queue2 = new QueueImpl(2, "queue2", null, false, true, false, -1, null, null);
+      Queue queue2 = new QueueImpl(2, "queue2", null, false, true, false, -1, null);
             
-      Queue queue3 = new QueueImpl(3, "queue3", null, false, true, false, -1, null, null);
+      Queue queue3 = new QueueImpl(3, "queue3", null, false, true, false, -1, null);
       
-      Queue queue4 = new QueueImpl(4, "queue4", null, false, true, false, -1, null, null);
+      Queue queue4 = new QueueImpl(4, "queue4", null, false, true, false, -1, null);
       
       String condition1 = "queue.condition1";
       

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -464,16 +464,14 @@
       String clientVMID = randomString();
       String username = null;
       String password = null;
-      int prefetchSize = 0;
- 
+
       CreateConnectionRequest request = new CreateConnectionRequest(version,
-            remotingSessionID, clientVMID, username, password, prefetchSize);
+            remotingSessionID, clientVMID, username, password);
 
       AbstractPacketCodec<CreateConnectionRequest> codec = new CreateConnectionMessageCodec();
       SimpleRemotingBuffer buffer = encode(request, codec);
       checkHeader(buffer, request);
-      checkBody(buffer, version, remotingSessionID, clientVMID, username,
-            password, prefetchSize);
+      checkBody(buffer, version, remotingSessionID, clientVMID, username, password);
       buffer.rewind();
 
       AbstractPacket decodedPacket = codec.decode(buffer);
@@ -575,13 +573,13 @@
    {      
       String destination = "queue.testCreateConsumerRequest";
       SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(destination,
-            "color = 'red'", false, false);
+            "color = 'red'", false, false, randomInt(), randomInt());
 
       AbstractPacketCodec codec = new SessionCreateConsumerMessageCodec();
       SimpleRemotingBuffer buffer = encode(request, codec);
       checkHeader(buffer, request);
       checkBody(buffer, request.getQueueName(), request
-            .getFilterString(), request.isNoLocal(), request.isAutoDeleteQueue());
+            .getFilterString(), request.isNoLocal(), request.isAutoDeleteQueue(), request.getWindowSize(), request.getMaxRate());
       buffer.rewind();
 
       Packet decodedPacket = codec.decode(buffer);
@@ -593,18 +591,19 @@
       assertEquals(request.getFilterString(), decodedRequest.getFilterString());
       assertEquals(request.isNoLocal(), decodedRequest.isNoLocal());
       assertEquals(request.isAutoDeleteQueue(), decodedRequest.isAutoDeleteQueue());
+      assertEquals(request.getWindowSize(), decodedRequest.getWindowSize());
+      assertEquals(request.getMaxRate(), decodedRequest.getMaxRate());
    }
 
    public void testCreateConsumerResponse() throws Exception
    {
+      SessionCreateConsumerResponseMessage response =
+      	new SessionCreateConsumerResponseMessage(randomString(), randomInt());
 
-      SessionCreateConsumerResponseMessage response = new SessionCreateConsumerResponseMessage(
-            randomString(), RandomUtil.randomInt());
-
       AbstractPacketCodec codec = new SessionCreateConsumerResponseMessageCodec();
       SimpleRemotingBuffer buffer = encode(response, codec);
       checkHeader(buffer, response);
-      checkBody(buffer, response.getConsumerID(), response.getPrefetchSize());
+      checkBody(buffer, response.getConsumerID(), response.getWindowSize());
       buffer.rewind();
 
       Packet decodedPacket = codec.decode(buffer);
@@ -612,19 +611,22 @@
       assertTrue(decodedPacket instanceof SessionCreateConsumerResponseMessage);
       SessionCreateConsumerResponseMessage decodedResponse = (SessionCreateConsumerResponseMessage) decodedPacket;
       assertEquals(SESS_CREATECONSUMER_RESP, decodedResponse.getType());
-      assertEquals(response.getPrefetchSize(), decodedResponse.getPrefetchSize());
+      
+      assertEquals(response.getConsumerID(), decodedResponse.getConsumerID());
+      assertEquals(response.getWindowSize(), decodedResponse.getWindowSize());
    }
    
    public void testCreateProducerRequest() throws Exception
    {      
       String destination = "queue.testCreateProducerRequest";
       int windowSize = randomInt();
-      SessionCreateProducerMessage request = new SessionCreateProducerMessage(destination, windowSize);
+      int maxRate = randomInt();
+      SessionCreateProducerMessage request = new SessionCreateProducerMessage(destination, windowSize, maxRate);
 
       AbstractPacketCodec codec = new SessionCreateProducerMessageCodec();
       SimpleRemotingBuffer buffer = encode(request, codec);
       checkHeader(buffer, request);
-      checkBody(buffer, request.getAddress(), request.getWindowSize());
+      checkBody(buffer, request.getAddress(), request.getWindowSize(), request.getMaxRate());
       buffer.rewind();
 
       Packet decodedPacket = codec.decode(buffer);
@@ -634,17 +636,18 @@
       assertEquals(SESS_CREATEPRODUCER, decodedRequest.getType());
       assertEquals(request.getAddress(), decodedRequest.getAddress());
       assertEquals(request.getWindowSize(), decodedRequest.getWindowSize());
+      assertEquals(request.getMaxRate(), decodedRequest.getMaxRate());
    }
    
    public void testCreateProducerResponse() throws Exception
    {
       SessionCreateProducerResponseMessage response =
-      	new SessionCreateProducerResponseMessage(randomString(), randomInt());
+      	new SessionCreateProducerResponseMessage(randomString(), randomInt(), randomInt());
 
       AbstractPacketCodec codec = new SessionCreateProducerResponseMessageCodec();
       SimpleRemotingBuffer buffer = encode(response, codec);
       checkHeader(buffer, response);
-      checkBody(buffer, response.getProducerID(), response.getWindowSize());
+      checkBody(buffer, response.getProducerID(), response.getWindowSize(), response.getMaxRate());
       buffer.rewind();
 
       Packet decodedPacket = codec.decode(buffer);
@@ -654,6 +657,7 @@
       assertEquals(SESS_CREATEPRODUCER_RESP, decodedResponse.getType());
       assertEquals(response.getProducerID(), decodedResponse.getProducerID());
       assertEquals(response.getWindowSize(), decodedResponse.getWindowSize());
+      assertEquals(response.getMaxRate(), decodedResponse.getMaxRate());
    }
 
    public void testStartConnectionMessage() throws Exception

Modified: trunk/tests/src/org/jboss/messaging/core/server/impl/test/timing/QueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/server/impl/test/timing/QueueTest.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/messaging/core/server/impl/test/timing/QueueTest.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -78,7 +78,7 @@
    
    public void testScheduledNoConsumer() throws Exception
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, null);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
            
       //Send one scheduled
       
@@ -144,7 +144,7 @@
    
    private void testScheduled(boolean direct)
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, null);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       FakeConsumer consumer = null;
       

Modified: trunk/tests/src/org/jboss/messaging/core/server/impl/test/unit/QueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/server/impl/test/unit/QueueTest.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/messaging/core/server/impl/test/unit/QueueTest.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -55,14 +55,11 @@
 	
 	private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
 
-   private final HierarchicalRepository<QueueSettings> queueSettings = 
-   	new HierarchicalObjectRepository<QueueSettings>();
-	
    public void testID()
    {
       final long id = 123;
       
-      Queue queue = new QueueImpl(id, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(id, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       assertEquals(id, queue.getPersistenceID());
       
@@ -77,40 +74,40 @@
    {
       final String name = "oobblle";
       
-      Queue queue = new QueueImpl(1, name, null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, name, null, false, true, false, -1, scheduledExecutor);
       
       assertEquals(name, queue.getName());
    }
    
    public void testClustered()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       assertFalse(queue.isClustered());
       
-      queue = new QueueImpl(1, "queue1", null, true, true, false, -1, scheduledExecutor, queueSettings);
+      queue = new QueueImpl(1, "queue1", null, true, true, false, -1, scheduledExecutor);
       
       assertTrue(queue.isClustered());
    }
    
    public void testDurable()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, false, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, false, false, -1, scheduledExecutor);
       
       assertFalse(queue.isDurable());
       
-      queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       assertTrue(queue.isDurable());
    }
    
    public void testTemporary()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, false, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, false, false, -1, scheduledExecutor);
       
       assertFalse(queue.isTemporary());
       
-      queue = new QueueImpl(1, "queue1", null, false, false, true, -1, scheduledExecutor, queueSettings);
+      queue = new QueueImpl(1, "queue1", null, false, false, true, -1, scheduledExecutor);
       
       assertTrue(queue.isTemporary());
    }
@@ -121,7 +118,7 @@
       
       final int id = 123;
       
-      Queue queue = new QueueImpl(id, "queue1", null, false, true, false, maxSize, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(id, "queue1", null, false, true, false, maxSize, scheduledExecutor);
       
       assertEquals(id, queue.getPersistenceID());
       
@@ -142,7 +139,7 @@
       
       Consumer cons3 = new FakeConsumer();
       
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       assertEquals(0, queue.getConsumerCount());
       
@@ -183,7 +180,7 @@
    
    public void testGetSetDistributionPolicy()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       assertNotNull(queue.getDistributionPolicy());
       
@@ -198,7 +195,7 @@
    
    public void testGetSetFilter()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       assertNull(queue.getFilter());
       
@@ -208,21 +205,21 @@
       
       assertEquals(filter, queue.getFilter());
       
-      queue = new QueueImpl(1, "queue1", filter, false, true, false, -1, scheduledExecutor, queueSettings);
+      queue = new QueueImpl(1, "queue1", filter, false, true, false, -1, scheduledExecutor);
       
       assertEquals(filter, queue.getFilter());
    }
    
    public void testDefaultMaxSize()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       assertEquals(-1, queue.getMaxSize());        
    }
    
    public void testSimpleAddLast()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       final int numMessages = 10;
       
@@ -241,7 +238,7 @@
    
    public void testSimpleDirectDelivery()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       FakeConsumer consumer = new FakeConsumer();
       
@@ -269,7 +266,7 @@
    
    public void testSimpleNonDirectDelivery()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       final int numMessages = 10;
       
@@ -307,7 +304,7 @@
    
    public void testBusyConsumer()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       FakeConsumer consumer = new FakeConsumer();
       
@@ -351,7 +348,7 @@
    
    public void testBusyConsumerThenAddMoreMessages()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       FakeConsumer consumer = new FakeConsumer();
       
@@ -418,7 +415,7 @@
          
    public void testAddFirstAddLast()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       final int numMessages = 10;
       
@@ -473,7 +470,7 @@
    
    public void testChangeConsumersAndDeliver() throws Exception
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
                   
       final int numMessages = 10;
       
@@ -624,7 +621,7 @@
    
    public void testConsumerReturningNull()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       class NullConsumer implements Consumer
       {
@@ -652,7 +649,7 @@
    
    public void testRoundRobinWithQueueing()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributionPolicy);
                   
@@ -697,7 +694,7 @@
    
    public void testRoundRobinDirect()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributionPolicy);
                   
@@ -740,7 +737,7 @@
    
    public void testRemoveAllReferences()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       final int numMessages = 10;
       
@@ -778,7 +775,7 @@
    {
       final int maxSize = 20;
       
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, maxSize, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, maxSize, scheduledExecutor);
       
       List<MessageReference> refs = new ArrayList<MessageReference>();
       
@@ -852,7 +849,7 @@
    
    public void testWithPriorities()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       final int numMessages = 10;
       
@@ -919,7 +916,7 @@
    
    public void testConsumerWithFilterAddAndRemove()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       Filter filter = new FakeFilter("fruit", "orange");
       
@@ -928,7 +925,7 @@
    
    public void testList()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       final int numMessages = 20;
       
@@ -952,7 +949,7 @@
    
    public void testListWithFilter()
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       final int numMessages = 20;
       
@@ -1018,7 +1015,7 @@
    
    public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       Filter filter = new FakeFilter("fruit", "orange");
       
@@ -1091,7 +1088,7 @@
    
    private void testConsumerWithFilters(boolean direct) throws Exception
    {
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       Filter filter = new FakeFilter("fruit", "orange");
       

Modified: trunk/tests/src/org/jboss/messaging/core/server/impl/test/unit/fakes/FakeQueueFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/server/impl/test/unit/fakes/FakeQueueFactory.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/messaging/core/server/impl/test/unit/fakes/FakeQueueFactory.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -28,9 +28,6 @@
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.impl.QueueImpl;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
 
 /**
  * 
@@ -43,14 +40,10 @@
 {
 	private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
 
-   private final HierarchicalRepository<QueueSettings> queueSettings = 
-   	new HierarchicalObjectRepository<QueueSettings>();
-	
 	public Queue createQueue(long persistenceID, String name, Filter filter,
 			                   boolean durable, boolean temporary)
 	{
-		return new QueueImpl(persistenceID, name, filter, false, durable, temporary, -1,
-				scheduledExecutor, queueSettings);
+		return new QueueImpl(persistenceID, name, filter, false, durable, temporary, -1, scheduledExecutor);
 	}
 
 }

Modified: trunk/tests/src/org/jboss/messaging/core/settings/impl/test/unit/QueueSettingsTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/settings/impl/test/unit/QueueSettingsTest.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/messaging/core/settings/impl/test/unit/QueueSettingsTest.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -52,8 +52,8 @@
       QueueSettings queueSettings = new QueueSettings();
       QueueSettings queueSettingsToMerge = new QueueSettings();
       queueSettingsToMerge.setClustered(true);
-      Queue DLQ = new QueueImpl(0,"testDLQ", null, false, false, false, 0, null, null);
-      Queue exp = new QueueImpl(0,"testExpiryQueue", null, false, false, false, 0, null, null);
+      Queue DLQ = new QueueImpl(0,"testDLQ", null, false, false, false, 0, null);
+      Queue exp = new QueueImpl(0,"testExpiryQueue", null, false, false, false, 0, null);
       queueSettingsToMerge.setDLQ(DLQ);
       queueSettingsToMerge.setExpiryQueue(exp);
       queueSettingsToMerge.setMaxDeliveryAttempts(1000);
@@ -77,8 +77,8 @@
       QueueSettings queueSettings = new  QueueSettings();
       QueueSettings queueSettingsToMerge = new QueueSettings();
       queueSettingsToMerge.setClustered(true);
-       Queue DLQ = new QueueImpl(0,"testDLQ", null, false, false, false, 0, null, null);
-      Queue exp = new QueueImpl(0,"testExpiryQueue", null, false, false, false, 0, null, null);
+       Queue DLQ = new QueueImpl(0,"testDLQ", null, false, false, false, 0, null);
+      Queue exp = new QueueImpl(0,"testExpiryQueue", null, false, false, false, 0, null);
       queueSettingsToMerge.setDLQ(DLQ);
       queueSettingsToMerge.setExpiryQueue(exp);
       queueSettingsToMerge.setMaxDeliveryAttempts(1000);
@@ -89,7 +89,7 @@
 
       QueueSettings queueSettingsToMerge2 = new QueueSettings();
       queueSettingsToMerge2.setClustered(true);
-      Queue exp2 = new QueueImpl(0,"testExpiryQueue2", null, false, false, false, 0, null, null);
+      Queue exp2 = new QueueImpl(0,"testExpiryQueue2", null, false, false, false, 0, null);
       queueSettingsToMerge2.setExpiryQueue(exp2);
       queueSettingsToMerge2.setMaxSize(2001);
       queueSettingsToMerge2.setRedeliveryDelay((long)2003);
@@ -111,8 +111,8 @@
       QueueSettings queueSettings = new  QueueSettings();
       QueueSettings queueSettingsToMerge = new QueueSettings();
       queueSettingsToMerge.setClustered(true);
-       Queue DLQ = new QueueImpl(0,"testDLQ", null, false, false, false, 0, null, null);
-      Queue exp = new QueueImpl(0,"testExpiryQueue", null, false, false, false, 0, null, null);
+       Queue DLQ = new QueueImpl(0,"testDLQ", null, false, false, false, 0, null);
+      Queue exp = new QueueImpl(0,"testExpiryQueue", null, false, false, false, 0, null);
       queueSettingsToMerge.setDLQ(DLQ);
       queueSettingsToMerge.setExpiryQueue(exp);
       queueSettingsToMerge.setMaxDeliveryAttempts(1000);
@@ -123,8 +123,8 @@
 
       QueueSettings queueSettingsToMerge2 = new QueueSettings();
       queueSettingsToMerge2.setClustered(false);
-      Queue exp2 = new QueueImpl(0,"testExpiryQueue2", null, false, false, false, 0, null, null);
-      Queue DLQ2 = new QueueImpl(0,"testDlq2", null, false, false, false, 0, null, null);
+      Queue exp2 = new QueueImpl(0,"testExpiryQueue2", null, false, false, false, 0, null);
+      Queue DLQ2 = new QueueImpl(0,"testDlq2", null, false, false, false, 0, null);
       queueSettingsToMerge2.setExpiryQueue(exp2);
       queueSettingsToMerge2.setDLQ(DLQ2);
       queueSettingsToMerge2.setMaxDeliveryAttempts(2000);

Modified: trunk/tests/src/org/jboss/messaging/core/transaction/impl/test/unit/TransactionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/transaction/impl/test/unit/TransactionTest.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/messaging/core/transaction/impl/test/unit/TransactionTest.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -50,7 +50,7 @@
       
       List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
       
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       MessageReference ref1 = this.generateReference(queue, 1);
       msgsToAdd.add(ref1.getMessage());
@@ -82,7 +82,7 @@
       
       List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
       
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       MessageReference ref1 = this.generateReference(queue, 1);
       msgsToAdd.add(ref1.getMessage());
@@ -100,7 +100,7 @@
       
       EasyMock.replay(pm);
       
-      tx.rollback(pm);
+      tx.rollback(pm, queueSettings);
       
       EasyMock.verify(pm);
  
@@ -113,7 +113,7 @@
       
       List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
       
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       MessageReference ref1 = this.generateReference(queue, 1);
       msgsToAdd.add(ref1.getMessage());
@@ -146,7 +146,7 @@
       
       List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
       
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       MessageReference ref1 = this.generateReference(queue, 1);
       msgsToAdd.add(ref1.getMessage());
@@ -187,7 +187,7 @@
       
       List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
       
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       MessageReference ref1 = this.generateReference(queue, 1);
       msgsToAdd.add(ref1.getMessage());
@@ -221,7 +221,7 @@
       
       List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
       
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       MessageReference ref1 = this.generateReference(queue, 1);
       msgsToAdd.add(ref1.getMessage());
@@ -253,7 +253,7 @@
       
       EasyMock.replay(pm);
       
-      tx.rollback(pm);
+      tx.rollback(pm, queueSettings);
       
       EasyMock.verify(pm);
    }
@@ -264,7 +264,7 @@
       
       List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
       
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       MessageReference ref1 = this.generateReference(queue, 1);
       msgsToAdd.add(ref1.getMessage());
@@ -304,7 +304,7 @@
       
       EasyMock.replay(sync);
       
-      tx.rollback(pm);
+      tx.rollback(pm, queueSettings);
       
       EasyMock.verify(sync);            
    }
@@ -315,7 +315,7 @@
       
       List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
       
-      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+      Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
       
       MessageReference ref1 = this.generateReference(queue, 1);
       msgsToAdd.add(ref1.getMessage());
@@ -361,7 +361,7 @@
       EasyMock.replay(sync);
       
       tx.prepare(pm);
-      tx.rollback(pm);
+      tx.rollback(pm, queueSettings);
       
       EasyMock.verify(sync);            
    }

Modified: trunk/tests/src/org/jboss/messaging/core/util/test/unit/TokenBucketLimiterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/util/test/unit/TokenBucketLimiterTest.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/messaging/core/util/test/unit/TokenBucketLimiterTest.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -105,7 +105,7 @@
 		
 		long start = System.currentTimeMillis();
 		
-		int count = 0;
+		long count = 0;
 		
 		final long measureTime = 5000;
 		

Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/JMSServerManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/JMSServerManagerTest.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/JMSServerManagerTest.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -191,7 +191,7 @@
 
    public void testCreateAndDestroyConectionFactory() throws Exception
    {
-      jmsServerManager.createConnectionFactory("newtestcf", "anid", 100, true, 100, 0, 0, "newtestcf");
+      jmsServerManager.createConnectionFactory("newtestcf", "anid", 100, true, 1000, -1, 1000, -1, "newtestcf");
       JBossConnectionFactory jbcf = (JBossConnectionFactory) getInitialContext().lookup("newtestcf");
       assertNotNull(jbcf);
       assertNotNull(jbcf.getDelegate());
@@ -208,7 +208,7 @@
       ArrayList<String> bindings = new ArrayList<String>();
       bindings.add("oranewtestcf");
       bindings.add("newtestcf");
-      jmsServerManager.createConnectionFactory("newtestcf", "anid", 100, true, 100, 1000, 0, bindings);
+      jmsServerManager.createConnectionFactory("newtestcf", "anid", 100, true, 1000, -1, 1000, -1, bindings);
       jbcf = (JBossConnectionFactory) getInitialContext().lookup("newtestcf");
       assertNotNull(jbcf);
       assertNotNull(jbcf.getDelegate());

Modified: trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2008-03-07 16:33:34 UTC (rev 3853)
@@ -601,9 +601,9 @@
 
    public void deployConnectionFactory(String objectName,
                                        List<String> jndiBindings,
-                                       int prefetchSize) throws Exception
+                                       int consumerWindowSize) throws Exception
    {
-      deployConnectionFactory(null, objectName, jndiBindings, prefetchSize, -1, -1, -1, false, false, false, -1);
+      deployConnectionFactory(null, objectName, jndiBindings, consumerWindowSize, -1, -1, -1, false, false, false, -1);
    }
 
 
@@ -613,7 +613,6 @@
       deployConnectionFactory(null, objectName, jndiBindings, -1, -1, -1, -1, false, false, false, -1);
    }
 
-
    public void deployConnectionFactory(String objectName, List<String> jndiBindings, boolean strictTck) throws Exception
    {
       deployConnectionFactory(null, objectName, jndiBindings, -1, -1, -1, -1, false, false, strictTck, -1);
@@ -652,7 +651,7 @@
    {
       log.info("deploying connection factory with name: " + objectName + " and dupsok: " + dupsOkBatchSize);
       getJMSServerManager().createConnectionFactory(objectName, clientId, dupsOkBatchSize,
-      		strictTck, prefetchSize, 1000, -1, jndiBindings);
+      		strictTck, prefetchSize, -1, 1000, -1, jndiBindings);
    }
 
 




More information about the jboss-cvs-commits mailing list