[jboss-cvs] JBoss Messaging SVN: r5228 - in trunk: src/main/org/jboss/messaging/core/client and 19 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Oct 30 16:13:21 EDT 2008


Author: timfox
Date: 2008-10-30 16:13:20 -0400 (Thu, 30 Oct 2008)
New Revision: 5228

Removed:
   trunk/src/main/org/jboss/messaging/core/postoffice/FlowController.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowControllerImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerFlowCreditMessage.java
Modified:
   trunk/src/config/jbm-jndi.xml
   trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java
   trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
   trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/Packet.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/server/Queue.java
   trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
   trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
   trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
   trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
   trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
   trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
   trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java
   trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionFactoryImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java
Log:
Removed flow control and re-implemented on connection - not finished yet


Modified: trunk/src/config/jbm-jndi.xml
===================================================================
--- trunk/src/config/jbm-jndi.xml	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/config/jbm-jndi.xml	2008-10-30 20:13:20 UTC (rev 5228)
@@ -52,7 +52,7 @@
       <!-- The batch size to use when using the DUPS_OK_ACKNOWLEDGE acknowledgement mode -->
       <dups-ok-batch-size>5000</dups-ok-batch-size>-size>
       <!-- This is the window size in number of messages to use when using producer window based flow control -->
-      <producer-window-size>1000</producer-window-size>
+      <producer-window-size>1048576</producer-window-size>
       <!-- This is the maximum producer send rate that will be applied when using rate based producer flow control -->
       <producer-max-rate>100</producer-max-rate>
       <!-- This is the window size in number of messages to use when using consumer window based flow control -->

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -56,7 +56,4 @@
    boolean isBlockOnNonPersistentSend();
    
    int getMaxRate();
-   
-   int getInitialWindowSize();
-
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -72,15 +72,12 @@
    ClientProducer createProducer(SimpleString address) throws MessagingException;
 
    ClientProducer createProducer(SimpleString address,
-                                 int windowSize,
                                  int maxRate,
                                  boolean blockOnNonPersistentSend,
                                  boolean blockOnPersistentSend) throws MessagingException;
 
-   ClientProducer createRateLimitedProducer(SimpleString address, int rate) throws MessagingException;
+   ClientProducer createProducer(SimpleString address, int rate) throws MessagingException;
 
-   ClientProducer createProducerWithWindowSize(SimpleString address, int windowSize) throws MessagingException;
-
    XAResource getXAResource();
 
    void commit() throws MessagingException;

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -49,9 +49,9 @@
 
    int getConsumerWindowSize();
 
-   void setProducerWindowSize(int size);
+   void setSendWindowSize(int size);
 
-   int getProducerWindowSize();
+   int getSendWindowSize();
 
    void setConsumerMaxRate(int rate);
 
@@ -87,8 +87,6 @@
 
    long getPingPeriod();
    
-   int getPingPoolSize();
-
    long getCallTimeout();   
    
    int getMaxConnections();

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -12,8 +12,6 @@
 
 package org.jboss.messaging.core.client.impl;
 
-import java.util.concurrent.Semaphore;
-
 import org.jboss.messaging.core.client.AcknowledgementHandler;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.exception.MessagingException;
@@ -55,10 +53,6 @@
 
    private volatile boolean closed;
 
-   // For limit throttling
-
-   private final Semaphore availableCredits;
-
    // For rate throttling
 
    private final TokenBucketLimiter rateLimiter;
@@ -67,10 +61,6 @@
 
    private final boolean blockOnPersistentSend;
 
-   private final boolean creditFlowControl;
-
-   private final int initialWindowSize;
-
    private final SimpleString autoGroupId;
 
    // Static ---------------------------------------------------------------------------------------
@@ -84,7 +74,6 @@
                              final boolean blockOnNonPersistentSend,
                              final boolean blockOnPersistentSend,
                              final SimpleString autoGroupId,
-                             final int initialCredits,
                              final Channel channel)
    {
       this.channel = channel;
@@ -102,12 +91,6 @@
       this.blockOnPersistentSend = blockOnPersistentSend;
 
       this.autoGroupId = autoGroupId;
-
-      availableCredits = new Semaphore(initialCredits);
-
-      creditFlowControl = initialCredits != -1;
-
-      initialWindowSize = initialCredits;
    }
 
    // ClientProducer implementation ----------------------------------------------------------------
@@ -245,11 +228,6 @@
       return blockOnNonPersistentSend;
    }
 
-   public int getInitialWindowSize()
-   {
-      return initialWindowSize;
-   }
-
    public int getMaxRate()
    {
       return rateLimiter == null ? -1 : rateLimiter.getRate();
@@ -262,16 +240,6 @@
       return id;
    }
 
-   public void receiveCredits(final int credits)
-   {
-      availableCredits.release(credits);
-   }
-
-   public int getAvailableCredits()
-   {
-      return availableCredits.availablePermits();
-   }
-
    // Public ---------------------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------
@@ -332,18 +300,6 @@
       {
          channel.send(message);
       }
-
-//      // We only flow control with non-anonymous producers
-//      if (address == null && creditFlowControl)
-//      {
-//         try
-//         {
-//            availableCredits.acquire(message.getClientMessage().getEncodeSize());
-//         }
-//         catch (InterruptedException e)
-//         {
-//         }
-//      }
    }
 
    private void checkClosed() throws MessagingException

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -35,9 +35,5 @@
 {
    long getID();
    
-	void receiveCredits(int credits) throws Exception;
-	
-	int getAvailableCredits();
-	
    void cleanUp();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -58,13 +58,11 @@
 
    public static final long DEFAULT_PING_PERIOD = 5000;
 
-   public static final int DEFAULT_PING_POOL_SIZE = 5;
-
    public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
 
    public static final int DEFAULT_CONSUMER_MAX_RATE = -1;
 
-   public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 1024 * 1024;
+   public static final int DEFAULT_SEND_WINDOW_SIZE = 1024 * 1024;
 
    public static final int DEFAULT_PRODUCER_MAX_RATE = -1;
 
@@ -94,8 +92,6 @@
 
    private final long pingPeriod;
 
-   private final int pingPoolSize;
-
    private final long callTimeout;
 
    private final int maxConnections;
@@ -108,7 +104,7 @@
 
    private volatile int consumerMaxRate;
 
-   private volatile int producerWindowSize;
+   private volatile int sendWindowSize;
 
    private volatile int producerMaxRate;
 
@@ -142,12 +138,11 @@
     */
    public ClientSessionFactoryImpl(final TransportConfiguration connectorConfig,
                                    final TransportConfiguration backupConfig,
-                                   final long pingPeriod,
-                                   final int pingPoolSize,
+                                   final long pingPeriod,                                 
                                    final long callTimeout,
                                    final int consumerWindowSize,
                                    final int consumerMaxRate,
-                                   final int producerWindowSize,
+                                   final int sendWindowSize,
                                    final int producerMaxRate,
                                    final boolean blockOnAcknowledge,
                                    final boolean blockOnNonPersistentSend,
@@ -164,7 +159,7 @@
                                                     pingPeriod,
                                                     callTimeout,
                                                     maxConnections,
-                                                    pingPoolSize);
+                                                    sendWindowSize);
       if (backupConfig != null)
       {
          backupConnectorFactory = instantiateConnectorFactory(backupConfig.getFactoryClassName());
@@ -176,7 +171,7 @@
                                                              pingPeriod,
                                                              callTimeout,
                                                              maxConnections,
-                                                             pingPoolSize);
+                                                             sendWindowSize);
       }
       else
       {
@@ -187,11 +182,10 @@
          backupConnectionManager = null;
       }
       this.pingPeriod = pingPeriod;
-      this.pingPoolSize = pingPoolSize;
       this.callTimeout = callTimeout;
       this.consumerWindowSize = consumerWindowSize;
       this.consumerMaxRate = consumerMaxRate;
-      this.producerWindowSize = producerWindowSize;
+      this.sendWindowSize = sendWindowSize;
       this.producerMaxRate = producerMaxRate;
       this.blockOnAcknowledge = blockOnAcknowledge;
       this.blockOnNonPersistentSend = blockOnNonPersistentSend;
@@ -206,11 +200,10 @@
       this(connectorConfig,
            backupConfig,
            DEFAULT_PING_PERIOD,
-           DEFAULT_PING_POOL_SIZE,
            DEFAULT_CALL_TIMEOUT,
            DEFAULT_CONSUMER_WINDOW_SIZE,
            DEFAULT_CONSUMER_MAX_RATE,
-           DEFAULT_PRODUCER_WINDOW_SIZE,
+           DEFAULT_SEND_WINDOW_SIZE,
            DEFAULT_PRODUCER_MAX_RATE,
            DEFAULT_BLOCK_ON_ACKNOWLEDGE,
            DEFAULT_BLOCK_ON_PERSISTENT_SEND,
@@ -259,14 +252,14 @@
       consumerWindowSize = size;
    }
 
-   public int getProducerWindowSize()
+   public int getSendWindowSize()
    {
-      return producerWindowSize;
+      return sendWindowSize;
    }
 
-   public void setProducerWindowSize(final int size)
+   public void setSendWindowSize(final int size)
    {
-      producerWindowSize = size;
+      sendWindowSize = size;
    }
 
    public int getProducerMaxRate()
@@ -354,11 +347,6 @@
       return pingPeriod;
    }
 
-   public int getPingPoolSize()
-   {
-      return pingPoolSize;
-   }
-
    public long getCallTimeout()
    {
       return callTimeout;

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -371,39 +371,22 @@
       return consumer;
    }
 
-
    public ClientProducer createProducer(final SimpleString address) throws MessagingException
    {      
       checkClosed();
 
-      return createProducer(address, connectionFactory.getProducerWindowSize(), connectionFactory.getProducerMaxRate());
+      return createProducer(address, connectionFactory.getProducerMaxRate());
    }
 
-   public ClientProducer createRateLimitedProducer(final SimpleString address, final int rate) throws MessagingException
+   public ClientProducer createProducer(final SimpleString address, final int maxRate) throws MessagingException
    {
-      checkClosed();
-
-      return createProducer(address, -1, rate);
-   }
-
-   public ClientProducer createProducerWithWindowSize(final SimpleString address, final int windowSize) throws MessagingException
-   {
-      checkClosed();
-
-      return createProducer(address, windowSize, -1);
-   }
-
-   private ClientProducer createProducer(final SimpleString address, final int windowSize, final int maxRate) throws MessagingException
-   {
-      return createProducer(address,
-                            windowSize,
+      return createProducer(address,               
                             maxRate,
                             connectionFactory.isBlockOnNonPersistentSend(),
                             connectionFactory.isBlockOnPersistentSend());
    }
 
    public ClientProducer createProducer(final SimpleString address,
-                                        final int windowSize,
                                         final int maxRate,
                                         final boolean blockOnNonPersistentSend,
                                         final boolean blockOnPersistentSend) throws MessagingException
@@ -419,8 +402,7 @@
 
       if (producer == null)
       {
-         SessionCreateProducerMessage request = new SessionCreateProducerMessage(address,
-                                                                                 windowSize,
+         SessionCreateProducerMessage request = new SessionCreateProducerMessage(address,                                                                             
                                                                                  maxRate,
                                                                                  autoGroupId);
 
@@ -440,8 +422,7 @@
                                                                                                    false),
                                            autoCommitSends && blockOnNonPersistentSend,
                                            autoCommitSends && blockOnPersistentSend,
-                                           response.getAutoGroupId(),
-                                           response.getInitialCredits(),
+                                           response.getAutoGroupId(),                                          
                                            channel);
       }
 
@@ -661,16 +642,6 @@
       }
    }
 
-   public void receiveProducerCredits(final long producerID, final int credits) throws Exception
-   {
-      ClientProducerInternal producer = producers.get(producerID);
-
-      if (producer != null)
-      {
-         producer.receiveCredits(credits);
-      }
-   }
-
    public void close() throws MessagingException
    {     
       if (closed)

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -46,8 +46,6 @@
 
    Map<SimpleString, ClientProducerInternal> getProducerCache();
 
-   void receiveProducerCredits(long producerID, int credits) throws Exception;
-
    void handleReceiveMessage(long consumerID, ClientMessage message) throws Exception;
 
    void handleFailover();

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -23,14 +23,12 @@
 package org.jboss.messaging.core.client.impl;
 
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVETOKENS;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 
 /**
@@ -59,14 +57,6 @@
       {
          switch (type)
          {
-            case SESS_RECEIVETOKENS:
-            {
-               SessionProducerFlowCreditMessage message = (SessionProducerFlowCreditMessage) packet;
-   
-               clientSession.receiveProducerCredits(message.getProducerID(), message.getTokens());
-               
-               break;
-            }
             case SESS_RECEIVE_MSG:
             {
                SessionReceiveMessage message = (SessionReceiveMessage) packet;

Deleted: trunk/src/main/org/jboss/messaging/core/postoffice/FlowController.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/FlowController.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/FlowController.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -1,42 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
-
-package org.jboss.messaging.core.postoffice;
-
-import org.jboss.messaging.core.server.ServerProducer;
-
-
-/**
- * 
- * A FlowController
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public interface FlowController
-{
-	void messageAcknowledged() throws Exception;
-	
-	void requestAndSendCredits(ServerProducer producer, int windowSize) throws Exception;
-	
-	int getInitialCredits(int windowSize, ServerProducer producer) throws Exception;
-}

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -73,11 +73,7 @@
    Binding getBinding(SimpleString queueName);
       
    List<MessageReference> route(ServerMessage message) throws Exception;
-   
-   //Flow control
-   
-   FlowController getFlowController(SimpleString address);
-     
+      
    //For testing only
    Map<SimpleString, List<Binding>> getMappings();
 

Deleted: trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowControllerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowControllerImpl.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowControllerImpl.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -1,177 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
-
-package org.jboss.messaging.core.postoffice.impl;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.postoffice.FlowController;
-import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.ServerProducer;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * 
- * A FlowControllerImpl
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class FlowControllerImpl implements FlowController
-{
-	private static final Logger log = Logger.getLogger(FlowControllerImpl.class);
-	
-	private int lastPot;
-		
-	private int creditPot;
-	
-	private final PostOffice postOffice;
-	
-	private final SimpleString address;
-	
-	private final java.util.Queue<ServerProducer> waitingList = new ConcurrentLinkedQueue<ServerProducer>();
-	
-	public FlowControllerImpl(final SimpleString address, final PostOffice postOffice) throws Exception
-	{
-		this.address = address;
-		
-		this.postOffice = postOffice;
-	}
-	
-	public synchronized int getInitialCredits(final int windowSize, final ServerProducer producer) throws Exception
-	{	     
-      fillPot();
-      
-		int num = Math.min(windowSize, creditPot);
-		
-		creditPot -= num;
-		
-		if (num <= 0)
-		{
-			//Register producer as a waiter or will never get any messages
-			
-			producer.setWaiting(true);
-			
-			waitingList.add(producer);
-		}
-		
-		return num;
-	}
-				
-	//FIXME - sort out the synchronization on this - don't want to lock the whole thing
-	//also don't want to execute the whole method if already waiting
-	public synchronized void messageAcknowledged() throws Exception
-	{		
-//	   log.info("acking");
-//	   
-//		fillPot();
-//		
-//		log.info("Filled pot is now " + creditPot);
-//			
-//		while (creditPot > 0)
-//		{
-//			ServerProducer producer = waitingList.poll();
-//			
-//			if (producer == null)
-//			{
-//				break;
-//			}
-//			
-//			producer.setWaiting(false);
-//			
-//			producer.requestAndSendCredits();
-//		}					
-	}
-		
-	public synchronized void requestAndSendCredits(final ServerProducer producer, final int credits) throws Exception
-	{		
-		if (creditPot <= 0)
-		{
-			if (!producer.isWaiting())
-			{
-				producer.setWaiting(true);
-				
-				waitingList.add(producer);
-			}
-		}
-		else
-		{
-		   int creditsToTake = Math.min(credits, creditPot);
-		   
-			//creditPot -= creditsToTake;
-			
-			producer.sendCredits(creditsToTake);
-		}
-	}
-			
-	private void fillPot() throws Exception
-	{
-	 //TODO - for now we don't take max size into account
-	   
-//		List<Binding> bindings = postOffice.getBindingsForAddress(address);
-//		
-//		int minAvailable = Integer.MAX_VALUE;
-//		
-//		for (Binding binding: bindings)
-//		{
-//			Queue queue = binding.getQueue();
-//			
-//			int maxSize = queue.getMaxSizeBytes();
-//			
-//			
-//			//log.info("max size is " + maxSize);
-//			
-//			int available;
-//			
-//			if (maxSize == -1)
-//			{
-//				available = Integer.MAX_VALUE;
-//			}
-//			else
-//			{
-//				available = maxSize - queue.getSizeBytes();
-//				
-//				log.info("Available is " + available);
-//			}
-//			
-//			if (available < 0)
-//			{
-//				available = 0;
-//			}
-//			
-//			minAvailable = Math.min(available, minAvailable);		
-//			
-//			log.info("min available is " + minAvailable);
-//		}
-//						
-//		log.info("lastpot is " + lastPot);
-//		if (minAvailable > lastPot)
-//		{
-//			creditPot += minAvailable - lastPot;
-//			
-//			lastPot = minAvailable;
-//		}
-		
-		creditPot = Integer.MAX_VALUE;
-	}
-}

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -28,8 +28,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
@@ -40,7 +38,6 @@
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.AddressManager;
 import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.FlowController;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
@@ -65,8 +62,6 @@
 
    private final AddressManager addressManager;
 
-   private final ConcurrentMap<SimpleString, FlowController> flowControllers = new ConcurrentHashMap<SimpleString, FlowController>();
-
    private final QueueFactory queueFactory;
 
    private final boolean checkAllowable;
@@ -163,8 +158,7 @@
          {
             storageManager.addDestination(address);
          }
-
-         flowControllers.put(address, new FlowControllerImpl(address, this));
+       
          managementService.registerAddress(address);
       }
 
@@ -176,9 +170,7 @@
       boolean removed = addressManager.removeDestination(address);
 
       if (removed)
-      {
-         flowControllers.remove(address);
-
+      {         
          if (durable)
          {
             storageManager.deleteDestination(address);
@@ -316,11 +308,6 @@
       return addressManager.getMappings();
    }
 
-   public FlowController getFlowController(final SimpleString address)
-   {
-      return flowControllers.get(address);
-   }
-   
    public synchronized void activate()
    {
       this.backup = false;
@@ -378,10 +365,6 @@
       managementService.registerQueue(binding.getQueue(), binding.getAddress(), storageManager);
 
       addressManager.addBinding(binding);
-
-      FlowController flowController = flowControllers.get(binding.getAddress());
-
-      binding.getQueue().setFlowController(flowController);
    }
 
    private Binding removeQueueInMemory(final SimpleString queueName) throws Exception
@@ -391,8 +374,6 @@
       if (addressManager.removeMapping(binding.getAddress(), queueName))
       {
          managementService.unregisterAddress(binding.getAddress());
-
-         binding.getQueue().setFlowController(null);
       }
 
       return binding;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/Packet.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Packet.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Packet.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -41,11 +41,13 @@
    
    byte getType();
 
-   void encode(MessagingBuffer buffer);
+   int encode(MessagingBuffer buffer);
       
    void decode(MessagingBuffer buffer);
    
    boolean isRequiresConfirmations();
    
    boolean isWriteAlways();     
+   
+   int getPacketSize();
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionManagerImpl.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionManagerImpl.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -52,47 +52,51 @@
 public class ConnectionManagerImpl implements ConnectionManager, ConnectionLifeCycleListener
 {
    private static final Logger log = Logger.getLogger(ConnectionManagerImpl.class);
-   
+
    private final ConnectorFactory connectorFactory;
-   
+
    private final Map<String, Object> params;
-   
+
    private final long pingInterval;
-   
+
    private final long callTimeout;
-   
+
    private final int maxConnections;
-       
-   //TODO - allow this to be configurable
+
+   // TODO - allow this to be configurable
    private static final ScheduledThreadPoolExecutor pingExecutor = new ScheduledThreadPoolExecutor(5,
-                                                                             new JBMThreadFactory("jbm-pinger-threads"));
-   
+                                                                                                   new JBMThreadFactory("jbm-pinger-threads"));
+
    private final Map<Object, ConnectionEntry> connections = new LinkedHashMap<Object, ConnectionEntry>();
-      
+
    private int refCount;
-   
+
    private Iterator<ConnectionEntry> mapIterator;
+
+   private Object failConnectionLock = new Object();
    
-   private Object failConnectionLock = new Object();
-      
+   private final int sendWindowSize;
+
    public ConnectionManagerImpl(final ConnectorFactory connectorFactory,
                                 final Map<String, Object> params,
                                 final long pingInterval,
                                 final long callTimeout,
                                 final int maxConnections,
-                                final int pingPoolSize)  // FIXME - pingPoolSize is not used
+                                final int sendWindowSize)
    {
       this.connectorFactory = connectorFactory;
-      
+
       this.params = params;
-      
+
       this.pingInterval = pingInterval;
-      
+
       this.callTimeout = callTimeout;
+
+      this.maxConnections = maxConnections;
       
-      this.maxConnections = maxConnections; 
+      this.sendWindowSize = sendWindowSize;
    }
-    
+
    public RemotingConnection createConnection()
    {
       DelegatingBufferHandler handler = new DelegatingBufferHandler();
@@ -110,7 +114,7 @@
          throw new IllegalStateException("Failed to connect");
       }
 
-      RemotingConnection connection = new RemotingConnectionImpl(tc, callTimeout, pingInterval, pingExecutor, null);
+      RemotingConnection connection = new RemotingConnectionImpl(tc, callTimeout, pingInterval, pingExecutor, null, sendWindowSize);
 
       handler.conn = connection;
 
@@ -120,7 +124,7 @@
 
       return connection;
    }
-   
+
    public synchronized RemotingConnection getConnection()
    {
       RemotingConnection conn;
@@ -142,7 +146,7 @@
             throw new IllegalStateException("Failed to connect");
          }
 
-         conn = new RemotingConnectionImpl(tc, callTimeout, pingInterval, pingExecutor, null);
+         conn = new RemotingConnectionImpl(tc, callTimeout, pingInterval, pingExecutor, null, sendWindowSize);
 
          handler.conn = conn;
 
@@ -153,108 +157,108 @@
       else
       {
          // Return one round-robin from the list
-         
+
          if (mapIterator == null || !mapIterator.hasNext())
          {
             mapIterator = connections.values().iterator();
          }
 
          ConnectionEntry entry = mapIterator.next();
-         
-         conn = entry.connection;            
+
+         conn = entry.connection;
       }
-      
+
       refCount++;
-      
+
       return conn;
    }
-   
+
    public synchronized void returnConnection(final Object connectionID)
-   {              
+   {
       ConnectionEntry entry = connections.get(connectionID);
-      
+
       if (refCount != 0)
       {
          refCount--;
       }
-      
+
       if (entry != null)
-      {                  
+      {
          checkCloseConnections();
       }
       else
       {
-         //Can be legitimately null if session was closed before then went to remove session from csf
-         //and locked since failover had started then after failover removes it but it's already been failed
+         // Can be legitimately null if session was closed before then went to remove session from csf
+         // and locked since failover had started then after failover removes it but it's already been failed
       }
    }
-    
+
    public void failConnection(final MessagingException me)
-   {      
+   {
       synchronized (failConnectionLock)
       {
-         //When a single connection fails, we fail *all* the connections
-         
-         Set<ConnectionEntry> copy = new HashSet<ConnectionEntry>(connections.values());         
-         
-         for (ConnectionEntry entry: copy)
+         // When a single connection fails, we fail *all* the connections
+
+         Set<ConnectionEntry> copy = new HashSet<ConnectionEntry>(connections.values());
+
+         for (ConnectionEntry entry : copy)
          {
-            entry.connection.fail(me);      
+            entry.connection.fail(me);
          }
-         
+
          refCount = 0;
       }
    }
-   
+
    public synchronized int getRefCount()
    {
       return refCount;
    }
-   
+
    public synchronized int numConnections()
-   {    
+   {
       return connections.size();
    }
-   
+
    public synchronized Set<RemotingConnection> getConnections()
    {
       Set<RemotingConnection> conns = new HashSet<RemotingConnection>();
-      
-      for (ConnectionEntry entry: connections.values())
+
+      for (ConnectionEntry entry : connections.values())
       {
          conns.add(entry.connection);
       }
-      
+
       return conns;
    }
-   
+
    // Private -------------------------------------------------------
-   
+
    private void checkCloseConnections()
    {
       if (refCount == 0)
       {
-         //Close connections
-            
+         // Close connections
+
          Set<ConnectionEntry> copy = new HashSet<ConnectionEntry>(connections.values());
-         
-         connections.clear();    
-         
-         for (ConnectionEntry entry: copy)
+
+         connections.clear();
+
+         for (ConnectionEntry entry : copy)
          {
             try
             {
                entry.connection.destroy();
-            
+
                entry.connector.close();
             }
             catch (Throwable ignore)
-            {                  
-            }                                    
-         }                                    
+            {
+            }
+         }
       }
    }
-   
+
    // ConnectionLifeCycleListener implementation --------------------
 
    public void connectionCreated(final Connection connection)
@@ -262,37 +266,36 @@
    }
 
    public void connectionDestroyed(final Object connectionID)
-   {      
+   {
       // If conn still exists here this means that the underlying transport
       // conn has been closed from the server side without
       // being returned from the client side so we need to fail the conn and
       // call it's listeners
-      MessagingException me = new MessagingException(MessagingException.OBJECT_CLOSED,
-                                                     "The conn has been closed.");
-      failConnection(me);              
+      MessagingException me = new MessagingException(MessagingException.OBJECT_CLOSED, "The conn has been closed.");
+      failConnection(me);
    }
 
    public void connectionException(final Object connectionID, final MessagingException me)
    {
-      failConnection(me);      
+      failConnection(me);
    }
-   
+
    // Inner classes ----------------------------------------------------------------
-   
+
    private class ConnectionEntry
    {
       ConnectionEntry(final RemotingConnection connection, final Connector connector)
       {
          this.connection = connection;
-         
+
          this.connector = connector;
       }
-      
+
       final RemotingConnection connection;
-      
+
       final Connector connector;
    }
-   
+
    private class DelegatingBufferHandler extends AbstractBufferHandler
    {
       RemotingConnection conn;
@@ -302,7 +305,7 @@
          conn.bufferReceived(connectionID, buffer);
       }
    }
-   
+
    private static class NoCacheConnectionLifeCycleListener implements ConnectionLifeCycleListener
    {
       private RemotingConnection conn;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -43,7 +43,6 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CLOSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVETOKENS;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REPLICATE_DELIVERY;
@@ -79,6 +78,7 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -119,7 +119,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
@@ -217,6 +216,10 @@
    private boolean frozen;
    
    private final Object failLock = new Object();
+   
+   private final int sendWindowSize;
+   
+   private final Semaphore sendSemaphore;
       
    // debug only stuff
 
@@ -232,9 +235,10 @@
                                  final long blockingCallTimeout,
                                  final long pingPeriod,
                                  final ScheduledExecutorService pingExecutor,
-                                 final List<Interceptor> interceptors)
+                                 final List<Interceptor> interceptors,
+                                 final int sendWindowSize)
    {
-      this(transportConnection, blockingCallTimeout, pingPeriod, pingExecutor, interceptors, null, true, true);
+      this(transportConnection, blockingCallTimeout, pingPeriod, pingExecutor, interceptors, null, true, true, sendWindowSize);
    }
 
    /*
@@ -254,7 +258,8 @@
            interceptors,
            replicatingConnection,
            active,
-           false);
+           false,
+           -1);
    }
    
    private RemotingConnectionImpl(final Connection transportConnection,
@@ -264,7 +269,8 @@
                                   final List<Interceptor> interceptors,
                                   final RemotingConnection replicatingConnection,
                                   final boolean active,
-                                  final boolean client)
+                                  final boolean client,
+                                  final int sendWindowSize)
 
    {
       this.transportConnection = transportConnection;
@@ -289,6 +295,17 @@
       this.client = client;
 
       this.createdActive = active;
+      
+      this.sendWindowSize = sendWindowSize;
+      
+      if (sendWindowSize != -1)
+      {        
+         this.sendSemaphore = new Semaphore(sendWindowSize, true);
+      }
+      else
+      {
+         this.sendSemaphore = null;
+      }
    }
 
    public void startPinger()
@@ -521,13 +538,22 @@
       }
    }
 
-   // private static AtomicInteger specialSeq = new AtomicInteger(0);
-
    private void doWrite(final Packet packet)
    {      
       final MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
 
-      packet.encode(buffer);
+      int size = packet.encode(buffer);
+      
+      if (packet.isRequiresConfirmations() && sendSemaphore != null)
+      {
+         try
+         {
+            sendSemaphore.acquire(size);
+         }
+         catch (InterruptedException e)
+         {            
+         }
+      }
 
       transportConnection.write(buffer);
    }
@@ -775,11 +801,6 @@
             packet = new SessionSendMessage();
             break;
          }
-         case SESS_RECEIVETOKENS:
-         {
-            packet = new SessionProducerFlowCreditMessage();
-            break;
-         }
          case SESS_RECEIVE_MSG:
          {
             packet = new SessionReceiveMessage();
@@ -1302,6 +1323,8 @@
          {
             throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
          }
+         
+         int sizeToFree = 0;
 
          for (int i = 0; i < numberToClear; i++)
          {
@@ -1324,9 +1347,16 @@
                                                " created active " +
                                                connection.createdActive);
             }
+            
+            sizeToFree += packet.getPacketSize();
          }
 
          firstStoredCommandID += numberToClear;
+         
+         if (connection.sendSemaphore != null)
+         {
+            connection.sendSemaphore.release(sizeToFree);
+         }
       }
 
       private class ReplicatedPacketsConfirmedChannelHandler implements ChannelHandler

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -22,6 +22,10 @@
 
 package org.jboss.messaging.core.remoting.impl.wireformat;
 
+import static org.jboss.messaging.util.DataConstants.SIZE_BYTE;
+import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+import static org.jboss.messaging.util.DataConstants.SIZE_LONG;
+
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 
 /**

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -33,6 +33,8 @@
    private long channelID;
 
    private final byte type;
+   
+   private int size;
 
    // The packet types
    // -----------------------------------------------------------------------------------
@@ -136,22 +138,20 @@
 
    public static final byte SESS_SEND = 75;
 
-   public static final byte SESS_RECEIVETOKENS = 76;
+   public static final byte SESS_CONSUMER_CLOSE = 76;
 
-   public static final byte SESS_CONSUMER_CLOSE = 77;
+   public static final byte SESS_PRODUCER_CLOSE = 77;
 
-   public static final byte SESS_PRODUCER_CLOSE = 78;
+   public static final byte SESS_RECEIVE_MSG = 78;
 
-   public static final byte SESS_RECEIVE_MSG = 79;
+   public static final byte SESS_MANAGEMENT_SEND = 79;
 
-   public static final byte SESS_MANAGEMENT_SEND = 80;
+   public static final byte SESS_SCHEDULED_SEND = 80;
 
-   public static final byte SESS_SCHEDULED_SEND = 81;
+   public static final byte SESS_FAILOVER_COMPLETE = 81;
 
-   public static final byte SESS_FAILOVER_COMPLETE = 82;
+   public static final byte SESS_REPLICATE_DELIVERY = 82;
 
-   public static final byte SESS_REPLICATE_DELIVERY = 83;
-
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)
@@ -175,8 +175,8 @@
    {
       this.channelID = channelID;
    }
-
-   public void encode(final MessagingBuffer buffer)
+   
+   public int encode(final MessagingBuffer buffer)
    {
       // The standard header fields
       buffer.putInt(0); // The length gets filled in at the end
@@ -191,6 +191,10 @@
       buffer.putInt(0, len);
 
       buffer.flip();
+      
+      size = DataConstants.SIZE_INT + len;
+      
+      return size;
    }
 
    public void decode(final MessagingBuffer buffer)
@@ -199,6 +203,11 @@
 
       decodeBody(buffer);
    }
+   
+   public int getPacketSize()
+   {
+      return size;
+   }
 
    public boolean isResponse()
    {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -38,7 +38,7 @@
    // Attributes ----------------------------------------------------
 
    private int commandID;
-   
+         
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -18,14 +18,13 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.remoting.impl.wireformat;
 
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.util.SimpleString;
 
-
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
@@ -38,30 +37,26 @@
    // Attributes ----------------------------------------------------
 
    private SimpleString address;
-   
-   private int windowSize;
-   
+
    private int maxRate;
 
    private boolean autoGroupId;
-      
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionCreateProducerMessage(final SimpleString address, final int windowSize, final int maxRate, final boolean autoGroupId)
+   public SessionCreateProducerMessage(final SimpleString address, final int maxRate, final boolean autoGroupId)
    {
       super(SESS_CREATEPRODUCER);
-  
+
       this.address = address;
-      
-      this.windowSize = windowSize;
-      
+
       this.maxRate = maxRate;
 
       this.autoGroupId = autoGroupId;
    }
-   
+
    public SessionCreateProducerMessage()
    {
       super(SESS_CREATEPRODUCER);
@@ -74,7 +69,6 @@
    {
       StringBuffer buff = new StringBuffer(getParentString());
       buff.append(", address=" + address);
-      buff.append(", windowSize=" + windowSize);
       buff.append(", maxrate=" + maxRate);
       buff.append(", autoGroupId=" + autoGroupId);
       buff.append("]");
@@ -85,15 +79,10 @@
    {
       return address;
    }
-   
-   public int getWindowSize()
-   {
-   	return windowSize;
-   }
-   
+
    public int getMaxRate()
    {
-   	return maxRate;
+      return maxRate;
    }
 
    public boolean isAutoGroupId()
@@ -104,33 +93,29 @@
    public void encodeBody(final MessagingBuffer buffer)
    {
       buffer.putNullableSimpleString(address);
-      buffer.putInt(windowSize);
       buffer.putInt(maxRate);
       buffer.putBoolean(autoGroupId);
    }
-   
+
    public void decodeBody(final MessagingBuffer buffer)
    {
-      address = buffer.getNullableSimpleString();      
-      windowSize = buffer.getInt();      
+      address = buffer.getNullableSimpleString();
       maxRate = buffer.getInt();
       autoGroupId = buffer.getBoolean();
    }
-   
+
    public boolean equals(Object other)
    {
       if (other instanceof SessionCreateProducerMessage == false)
       {
          return false;
       }
-            
+
       SessionCreateProducerMessage r = (SessionCreateProducerMessage)other;
-      
-      return super.equals(other) &&
-             this.address == null ? r.address == null : this.address.equals(r.address) &&
-             this.windowSize == r.windowSize &&
-             this.maxRate == r.maxRate &&
-             this.autoGroupId == autoGroupId;                  
+
+      return super.equals(other) && this.address == null ? r.address == null
+                                                        : this.address.equals(r.address) && this.maxRate == r.maxRate &&
+                                                          this.autoGroupId == autoGroupId;
    }
 
    // Package protected ---------------------------------------------
@@ -141,4 +126,3 @@
 
    // Inner classes -------------------------------------------------
 }
-

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -35,9 +35,7 @@
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------
-
-   private int initialCredits;
-   
+  
    private int maxRate;
 
    private SimpleString autoGroupId;
@@ -46,12 +44,10 @@
 
    // Constructors --------------------------------------------------
 
-   public SessionCreateProducerResponseMessage(final int initialCredits, final int maxRate, final SimpleString autoGroupId)
+   public SessionCreateProducerResponseMessage(final int maxRate, final SimpleString autoGroupId)
    {
       super(SESS_CREATEPRODUCER_RESP);
  
-      this.initialCredits = initialCredits;
-      
       this.maxRate = maxRate;
 
       this.autoGroupId = autoGroupId;
@@ -69,11 +65,6 @@
       return true;
    }
    
-   public int getInitialCredits()
-   {
-   	return initialCredits;
-   }
-   
    public int getMaxRate()
    {
    	return maxRate;
@@ -86,14 +77,12 @@
 
    public void encodeBody(final MessagingBuffer buffer)
    {
-      buffer.putInt(initialCredits);
       buffer.putInt(maxRate);
       buffer.putNullableSimpleString(autoGroupId);
    }
    
    public void decodeBody(final MessagingBuffer buffer)
    {     
-      initialCredits = buffer.getInt();
       maxRate = buffer.getInt();
       autoGroupId = buffer.getNullableSimpleString();
    }
@@ -103,7 +92,6 @@
    public String toString()
    {
       StringBuffer buf = new StringBuffer(getParentString());
-      buf.append(", initialCredits=" + initialCredits);
       buf.append(", maxRate=" + maxRate);
       buf.append("]");
       return buf.toString();
@@ -118,8 +106,7 @@
             
       SessionCreateProducerResponseMessage r = (SessionCreateProducerResponseMessage)other;
       
-      return super.equals(other) &&
-         this.initialCredits == r.initialCredits &&
+      return super.equals(other) &&       
          this.maxRate == r.maxRate;
       
    }

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerFlowCreditMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerFlowCreditMessage.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerFlowCreditMessage.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -1,114 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * 
- * A SessionProducerFlowCreditMessage
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class SessionProducerFlowCreditMessage extends PacketImpl
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private long producerID;
-   
-   private int credits;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public SessionProducerFlowCreditMessage(final long producerID, final int credits)
-   {
-      super(SESS_RECEIVETOKENS);
-
-      this.producerID = producerID;
-      
-      this.credits = credits;
-   }
-   
-   public SessionProducerFlowCreditMessage()
-   {
-      super(SESS_RECEIVETOKENS);
-   }
-
-   // Public --------------------------------------------------------
-
-   public long getProducerID()
-   {
-      return producerID;
-   }
-   
-   public int getTokens()
-   {
-      return credits;
-   }
-   
-   public void encodeBody(final MessagingBuffer buffer)
-   {
-      buffer.putLong(producerID);
-      buffer.putInt(credits);
-   }
-   
-   public void decodeBody(final MessagingBuffer buffer)
-   {
-      producerID = buffer.getLong();
-      credits = buffer.getInt();
-   }
-
-   @Override
-   public String toString()
-   {
-      StringBuffer buf = new StringBuffer(getParentString());
-      buf.append(", producerID=" + producerID + ", credits=" + credits);
-      buf.append("]");
-      return buf.toString();
-   }
-   
-   public boolean equals(Object other)
-   {
-      if (other instanceof SessionProducerFlowCreditMessage == false)
-      {
-         return false;
-      }
-            
-      SessionProducerFlowCreditMessage r = (SessionProducerFlowCreditMessage)other;
-      
-      return super.equals(other) && this.credits == r.credits && this.producerID == r.producerID;
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -29,7 +29,6 @@
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.FlowController;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -102,10 +101,6 @@
    
    int getMessagesAdded();
 
-   FlowController getFlowController();
-   
-   void setFlowController(FlowController flowController);
-  
    MessageReference removeReferenceWithID(long id);
    
    MessageReference getReference(long id);

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -40,10 +40,4 @@
 	void send(ServerMessage msg) throws Exception;
 
    void sendScheduled(ServerMessage message, long scheduledDeliveryTime) throws Exception;
-	
-	void sendCredits(int credits) throws Exception;
-	
-	void setWaiting(boolean waiting);
-	
-	boolean isWaiting();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -22,6 +22,7 @@
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
@@ -260,8 +261,8 @@
                                                                   backupConnectorParams,
                                                                   5000,
                                                                   30000,
-                                                                  1,
-                                                                  5);
+                                                                  ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+                                                                  ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE);
       }
       remotingService.setMessagingServer(this);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -28,7 +28,6 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.FlowController;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.Consumer;
 import org.jboss.messaging.core.server.DistributionPolicy;
@@ -89,8 +88,6 @@
 
    private AtomicInteger deliveringCount = new AtomicInteger(0);
 
-   private volatile FlowController flowController;
-
    private AtomicBoolean waitingToDeliver = new AtomicBoolean(false);
 
    private final Runnable deliverRunner = new DeliverRunner();
@@ -348,16 +345,7 @@
       return messagesAdded.get();
    }
 
-   public void setFlowController(final FlowController flowController)
-   {
-      this.flowController = flowController;
-   }
 
-   public FlowController getFlowController()
-   {
-      return flowController;
-   }
-
    public synchronized void deleteAllReferences(final StorageManager storageManager) throws Exception
    {
       Transaction tx = new TransactionImpl(storageManager, postOffice);

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -22,15 +22,10 @@
 
 package org.jboss.messaging.core.server.impl;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.postoffice.FlowController;
-import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.ServerProducer;
 import org.jboss.messaging.core.server.ServerSession;
-import org.jboss.messaging.util.SimpleString;
 
 /**
  * 
@@ -48,37 +43,13 @@
 	
 	private final ServerSession session;
 	
-	private final SimpleString address;
-	
-	private final FlowController flowController;
-	
-	private final int windowSize;
-	
-	private volatile boolean waiting;
-	
-   private AtomicInteger creditsToSend = new AtomicInteger(0);
-   
-   private final Channel channel;
-     	
 	// Constructors ----------------------------------------------------------------
 	
-	public ServerProducerImpl(final long id, final ServerSession session,
-	                          final SimpleString address, 
-			                    final FlowController flowController,
-			                    final int windowSize,			                    
-			                    final Channel channel) throws Exception
+	public ServerProducerImpl(final long id, final ServerSession session) throws Exception
 	{	
 	   this.id = id;
 	   
 		this.session = session;
-		
-		this.address = address;
-		
-		this.flowController = flowController;		
-		
-		this.windowSize = windowSize;
-		
-		this.channel = channel;
 	}
 	
 	// ServerProducer implementation --------------------------------------------
@@ -95,62 +66,11 @@
 	
 	public void send(final ServerMessage message) throws Exception
 	{
-      doFlowControl(message);
-
       session.send(message);  		
 	}
 	
 	public void sendScheduled(final ServerMessage message, final long scheduledDeliveryTime) throws Exception
    {
-      doFlowControl(message);
-
       session.sendScheduled(message, scheduledDeliveryTime);
    }
-	
-   public void requestAndSendCredits() throws Exception
-	{	 
-	   if (!waiting)
-	   {
-	      flowController.requestAndSendCredits(this, creditsToSend.get());
-	   }
-	}
-
-	public void sendCredits(final int credits) throws Exception
-	{
-	   creditsToSend.addAndGet(-credits);
-	   
-//		Packet packet = new SessionProducerFlowCreditMessage(id, credits);
-//		
-//		channel.send(packet);	
-	}
-	
-	public void setWaiting(final boolean waiting)
-	{
-		this.waiting = waiting;
-	}
-	
-	public boolean isWaiting()
-	{
-		return waiting;
-	}
-
-
-
-   private void doFlowControl(final ServerMessage message) throws Exception
-   {
-      if (this.address != null)
-      {
-         //Only do flow control with non anonymous producers
-
-         if (flowController != null)
-         {
-            int creds = creditsToSend.addAndGet(message.getEncodeSize());
-
-            if (creds >= windowSize)
-            {
-               requestAndSendCredits();
-            }
-         }
-      }
-   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -38,7 +38,6 @@
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.FlowController;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.DelayedResult;
@@ -855,44 +854,19 @@
 
       int maxRate = packet.getMaxRate();
 
-      int windowSize = packet.getWindowSize();
-
       boolean autoGroupID = packet.isAutoGroupId();
 
       Packet response = null;
 
       try
       {
-         FlowController flowController = null;
-
          final int maxRateToUse = maxRate;
 
-         if (address != null)
-         {
-            flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
-         }
-
-         final int windowToUse = flowController == null ? -1 : windowSize;
-
-         // Server window size is 0.75 client window size for producer flow control
-         // (other way round to consumer flow control)
-
-         final int serverWindowSize = windowToUse == -1 ? -1 : (int)(windowToUse * 0.75);
-
          ServerProducerImpl producer = new ServerProducerImpl(idGenerator.generateID(),
-                                                              this,
-                                                              address,
-                                                              flowController,
-                                                              serverWindowSize,
-                                                              channel);
+                                                              this);
 
          producers.put(producer.getID(), producer);
 
-         // Get some initial credits to send to the producer - we try for
-         // windowToUse
-
-         int initialCredits = flowController == null ? -1 : flowController.getInitialCredits(windowToUse, producer);
-
          SimpleString groupId = null;
 
          if (autoGroupID)
@@ -900,7 +874,7 @@
             groupId = simpleStringIdGenerator.generateID();
          }
 
-         response = new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse, groupId);
+         response = new SessionCreateProducerResponseMessage(maxRateToUse, groupId);
       }
       catch (Exception e)
       {

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -72,15 +72,13 @@
 
    private final long pingPeriod;
    
-   private final int pingPoolSize;
-
    private final long callTimeout;
 
    private final int consumerWindowSize;
 
    private final int consumerMaxRate;
 
-   private final int producerWindowSize;
+   private final int sendWindowSize;
 
    private final int producerMaxRate;
 
@@ -98,14 +96,13 @@
 
    public JBossConnectionFactory(final TransportConfiguration connectorConfig,
                                  final TransportConfiguration backupConnectorConfig,
-                                 final long pingPeriod,
-                                 final int pingPoolSize,
+                                 final long pingPeriod,                         
                                  final long callTimeout,
                                  final String clientID,
                                  final int dupsOKBatchSize,
                                  final int consumerWindowSize,
                                  final int consumerMaxRate,
-                                 final int producerWindowSize,
+                                 final int sendWindowSize,
                                  final int producerMaxRate,
                                  final boolean blockOnAcknowledge,
                                  final boolean blockOnNonPersistentSend,
@@ -118,12 +115,11 @@
       this.clientID = clientID;
       this.dupsOKBatchSize = dupsOKBatchSize;
       this.pingPeriod = pingPeriod;
-      this.pingPoolSize = pingPoolSize;
       this.callTimeout = callTimeout;
       this.consumerMaxRate = consumerMaxRate;
       this.consumerWindowSize = consumerWindowSize;
       this.producerMaxRate = producerMaxRate;
-      this.producerWindowSize = producerWindowSize;
+      this.sendWindowSize = sendWindowSize;
       this.blockOnAcknowledge = blockOnAcknowledge;
       this.blockOnNonPersistentSend = blockOnNonPersistentSend;
       this.blockOnPersistentSend = blockOnPersistentSend;
@@ -225,11 +221,6 @@
       return pingPeriod;
    }
    
-   public int getPingPoolSize()
-   {
-      return pingPoolSize;
-   }
-
    public long getCallTimeout()
    {
       return callTimeout;
@@ -257,7 +248,7 @@
 
    public int getProducerWindowSize()
    {
-      return producerWindowSize;
+      return sendWindowSize;
    }
 
    public int getProducerMaxRate()
@@ -285,7 +276,7 @@
       return autoGroupId;
    }
 
-// Package protected ----------------------------------------------------------------------------
+   // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------
 
@@ -299,12 +290,11 @@
          // It doesn't matter if more than one is created due to a race
          sessionFactory = new ClientSessionFactoryImpl(connectorConfig,
                                                        backupConnectorConfig,
-                                                       pingPeriod,
-                                                       pingPoolSize,
+                                                       pingPeriod,                                                 
                                                        callTimeout,
                                                        consumerWindowSize,
                                                        consumerMaxRate,
-                                                       producerWindowSize,
+                                                       sendWindowSize,
                                                        producerMaxRate,
                                                        blockOnAcknowledge,
                                                        blockOnNonPersistentSend,

Modified: trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -108,14 +108,13 @@
    boolean createConnectionFactory(String name,
                                    TransportConfiguration connectorConfig,
                                    TransportConfiguration backupConnectorConfig,
-                                   long pingPeriod,
-                                   int pingPoolSize,
+                                   long pingPeriod,                                
                                    long callTimeout,
                                    String clientID,
                                    int dupsOKBatchSize,
                                    int consumerWindowSize,
                                    int consumerMaxRate,
-                                   int producerWindowSize,
+                                   int sendWindowSize,
                                    int producerMaxRate,
                                    boolean blockOnAcknowledge,
                                    boolean blockOnNonPersistentSend,
@@ -127,14 +126,13 @@
    boolean createConnectionFactory(String name,
                                    TransportConfiguration connectorConfig,
                                    TransportConfiguration backupConnectorConfig,
-                                   long pingPeriod,
-                                   int pingPoolSize,
+                                   long pingPeriod,                              
                                    long callTimeout,
                                    String clientID,
                                    int dupsOKBatchSize,
                                    int consumerWindowSize,
                                    int consumerMaxRate,
-                                   int producerWindowSize,
+                                   int sendWindowSize,
                                    int producerMaxRate,
                                    boolean blockOnAcknowledge,
                                    boolean blockOnNonPersistentSend,

Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -44,8 +44,6 @@
 
    private static final String PING_PERIOD_ELEMENT = "ping-period";
    
-   private static final String PING_POOL_SIZE_ELEMENT = "ping-pool-size";
-
    private static final String CALL_TIMEOUT_ELEMENT = "call-timeout";
 
    private static final String DUPS_OK_BATCH_SIZE_ELEMENT = "dups-ok-batch-size";
@@ -131,14 +129,13 @@
       {
          NodeList children = node.getChildNodes();
 
-         long pingPeriod = ClientSessionFactoryImpl.DEFAULT_PING_PERIOD;
-         int pingPoolSize = ClientSessionFactoryImpl.DEFAULT_PING_POOL_SIZE;
+         long pingPeriod = ClientSessionFactoryImpl.DEFAULT_PING_PERIOD;     
          long callTimeout = ConfigurationImpl.DEFAULT_CALL_TIMEOUT;
          String clientID = null;
          int dupsOKBatchSize = DEFAULT_DUPS_OK_BATCH_SIZE;
          int consumerWindowSize = ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
          int consumerMaxRate = ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
-         int producerWindowSize = ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
+         int sendWindowSize = ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE;
          int producerMaxRate = ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
          boolean blockOnAcknowledge = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
          boolean blockOnNonPersistentSend = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
@@ -157,10 +154,6 @@
             {
                pingPeriod = Long.parseLong(children.item(j).getTextContent().trim());
             }
-            else if (PING_POOL_SIZE_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
-            {
-               pingPoolSize = Integer.parseInt(children.item(j).getTextContent().trim());
-            }
             else if (CALL_TIMEOUT_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
             {
                callTimeout = Long.parseLong(children.item(j).getTextContent().trim());
@@ -175,7 +168,7 @@
             }
             else if (PRODUCER_WINDOW_SIZE_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
             {
-               producerWindowSize = Integer.parseInt(children.item(j).getTextContent().trim());
+               sendWindowSize = Integer.parseInt(children.item(j).getTextContent().trim());
             }
             else if (PRODUCER_MAX_RATE_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
             {
@@ -401,14 +394,13 @@
          jmsServerManager.createConnectionFactory(name,
                                                   connectorConfig,
                                                   backupConnectorConfig,
-                                                  pingPeriod,
-                                                  pingPoolSize,
+                                                  pingPeriod,                                      
                                                   callTimeout,
                                                   clientID,
                                                   dupsOKBatchSize,
                                                   consumerWindowSize,
                                                   consumerMaxRate,
-                                                  producerWindowSize,
+                                                  sendWindowSize,
                                                   producerMaxRate,
                                                   blockOnAcknowledge,
                                                   blockOnNonPersistentSend,

Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -189,14 +189,13 @@
    public boolean createConnectionFactory(String name,
                                           TransportConfiguration connectorConfig,
                                           TransportConfiguration backupConnectorConfig,
-                                          long pingPeriod,
-                                          int pingPoolSize,
+                                          long pingPeriod,                                        
                                           long callTimeout,
                                           String clientID,
                                           int dupsOKBatchSize,
                                           int consumerWindowSize,
                                           int consumerMaxRate,
-                                          int producerWindowSize,
+                                          int sendWindowSize,
                                           int producerMaxRate,
                                           boolean blockOnAcknowledge,
                                           boolean blockOnNonPersistentSend,
@@ -210,14 +209,13 @@
       {
          cf = new JBossConnectionFactory(connectorConfig,
                                          backupConnectorConfig,
-                                         pingPeriod,
-                                         pingPoolSize,
+                                         pingPeriod,                                      
                                          callTimeout,
                                          clientID,
                                          dupsOKBatchSize,
                                          consumerWindowSize,
                                          consumerMaxRate,
-                                         producerWindowSize,
+                                         sendWindowSize,
                                          producerMaxRate,
                                          blockOnAcknowledge,
                                          blockOnNonPersistentSend,
@@ -246,14 +244,13 @@
    public boolean createConnectionFactory(String name,
                                           TransportConfiguration connectorConfig,
                                           TransportConfiguration backupConnectorConfig,
-                                          long pingPeriod,
-                                          int pingPoolSize,
+                                          long pingPeriod,                                         
                                           long callTimeout,
                                           String clientID,
                                           int dupsOKBatchSize,
                                           int consumerWindowSize,
                                           int consumerMaxRate,
-                                          int producerWindowSize,
+                                          int sendWindowSize,
                                           int producerMaxRate,
                                           boolean blockOnAcknowledge,
                                           boolean blockOnNonPersistentSend,
@@ -267,14 +264,13 @@
       {
          cf = new JBossConnectionFactory(connectorConfig,
                                          backupConnectorConfig,
-                                         pingPeriod,
-                                         pingPoolSize,
+                                         pingPeriod,                           
                                          callTimeout,
                                          clientID,
                                          dupsOKBatchSize,
                                          consumerWindowSize,
                                          consumerMaxRate,
-                                         producerWindowSize,
+                                         sendWindowSize,
                                          producerMaxRate,
                                          blockOnAcknowledge,
                                          blockOnNonPersistentSend,

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -71,8 +71,6 @@
                                 TransportConfiguration backupConnectorConfig,
                                 @Parameter(name = "pingPeriod", desc = "The ping period in m")
                                 long pingPeriod,
-                                @Parameter(name = "pingPoolSize", desc = "The max size of thread pool used for pinging")
-                                int pingPoolSize,
                                 @Parameter(name = "callTimeout", desc = "The call timeout in m")
                                 long callTimeout,
                                 @Parameter(name = "clientID", desc = "ClientID for created connections")

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -79,8 +79,7 @@
    public void createConnectionFactory(String name,
                                        TransportConfiguration connectorConfig,
                                        TransportConfiguration backupConnectorConfig,
-                                       long pingPeriod,
-                                       int pingPoolSize,
+                                       long pingPeriod,                                  
                                        long callTimeout,
                                        String clientID,
                                        int dupsOKBatchSize,
@@ -101,8 +100,7 @@
       boolean created = server.createConnectionFactory(name,
                                                        connectorConfig,
                                                        backupConnectorConfig,
-                                                       pingPeriod,
-                                                       pingPoolSize,
+                                                       pingPeriod,                                                    
                                                        callTimeout,
                                                        clientID,
                                                        dupsOKBatchSize,

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -89,14 +89,13 @@
          getJmsServerManager().createConnectionFactory("StrictTCKConnectionFactory",
                                                        new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"),
                                                        null,
+                                                       5000,                                                       
                                                        5000,
-                                                       5,
-                                                       5000,
                                                        null,
                                                        1000,
                                                        1024 * 1024,
                                                        -1,
-                                                       1000,
+                                                       1024 * 1024,
                                                        -1,
                                                        true,
                                                        true,

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -39,14 +39,13 @@
       getJmsServerManager().createConnectionFactory("testsuitecf",
                                                     new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"),
                                                     null,
+                                                    5000,                                                    
                                                     5000,
-                                                    5,
-                                                    5000,
                                                     null,
                                                     1000,
                                                     1024 * 1024,
                                                     -1,
-                                                    1000,
+                                                    1024 * 1024,
                                                     -1,
                                                     true,
                                                     true,
@@ -54,7 +53,7 @@
                                                     false,
                                                     8,
                                                     "/testsuitecf");
-
+      
       cf = (JBossConnectionFactory)getInitialContext().lookup("/testsuitecf");
    }
 

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -537,14 +537,13 @@
       getJMSServerManager().createConnectionFactory(objectName,
                                                     new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"),
                                                     null,
+                                                    5000,                                                    
                                                     5000,
-                                                    5,
-                                                    5000,
                                                     clientId,
                                                     dupsOkBatchSize,
                                                     prefetchSize,
                                                     -1,
-                                                    1000,
+                                                    1024 * 1024,
                                                     -1,
                                                     blockOnAcknowledge,
                                                     true,

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -29,9 +29,8 @@
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PING_POOL_SIZE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE;
 import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_CALL_TIMEOUT;
 import junit.framework.TestCase;
 
@@ -157,11 +156,10 @@
       sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"),
            null,
            2000,
-           DEFAULT_PING_POOL_SIZE,
            DEFAULT_CALL_TIMEOUT,
            DEFAULT_CONSUMER_WINDOW_SIZE,
            DEFAULT_CONSUMER_MAX_RATE,
-           DEFAULT_PRODUCER_WINDOW_SIZE,
+           DEFAULT_SEND_WINDOW_SIZE,
            DEFAULT_PRODUCER_MAX_RATE,
            DEFAULT_BLOCK_ON_ACKNOWLEDGE,
            DEFAULT_BLOCK_ON_PERSISTENT_SEND,

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -30,7 +30,6 @@
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.FlowController;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
@@ -89,11 +88,6 @@
       return null;
    }
 
-   public FlowController getFlowController(SimpleString address)
-   {
-      return null;
-   }
-
    public Map<SimpleString, List<Binding>> getMappings()
    {
       return null;

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionFactoryImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionFactoryImplTest.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionFactoryImplTest.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -189,7 +189,7 @@
 //      assertEquals(ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT, cf.getCallTimeout());
 //      assertEquals(ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE, cf.getConsumerWindowSize());
 //      assertEquals(ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE, cf.getConsumerMaxRate());
-//      assertEquals(ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE, cf.getProducerWindowSize());
+//      assertEquals(ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE, cf.getProducerWindowSize());
 //      assertEquals(ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE, cf.getProducerMaxRate());
 //      assertEquals(ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE, cf.isBlockOnAcknowledge());
 //      assertEquals(ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND, cf.isBlockOnNonPersistentSend());

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -36,7 +36,6 @@
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.FlowController;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.postoffice.impl.BindingImpl;
 import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
@@ -120,7 +119,6 @@
       EasyMock.expect(binding.getQueue()).andStubReturn(queue);
       SimpleString queueName = new SimpleString("testQueueName1");
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setFlowController(null);
       EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
@@ -164,7 +162,6 @@
          EasyMock.expect(bindings[i].getAddress()).andStubReturn(addresses[i]);
          EasyMock.expect(bindings[i].getQueue()).andStubReturn(queues[i]);
          EasyMock.expect(queues[i].getName()).andStubReturn(queueNames[i]);
-         queues[i].setFlowController(null);
          EasyMock.expect(queues[i].getPersistenceID()).andStubReturn(i + 1);
          EasyMock.replay(bindings[i], queues[i]);
       }
@@ -217,7 +214,6 @@
       EasyMock.expect(binding2.getQueue()).andStubReturn(queue);
       SimpleString queueName = new SimpleString("testQueueName1");
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setFlowController(null);
       EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
 
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
@@ -262,7 +258,6 @@
          EasyMock.expect(bindings[i].getAddress()).andStubReturn(address);
          EasyMock.expect(bindings[i].getQueue()).andStubReturn(queues[i]);
          EasyMock.expect(queues[i].getName()).andStubReturn(queueNames[i]);
-         queues[i].setFlowController(null);
          EasyMock.expect(queues[i].getPersistenceID()).andStubReturn(i + 1);
          EasyMock.replay(bindings[i], queues[i]);
       }
@@ -314,7 +309,6 @@
       EasyMock.expect(binding.getQueue()).andStubReturn(queue);
       SimpleString queueName = new SimpleString("testQueueName1");
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
 
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
@@ -378,7 +372,6 @@
          EasyMock.expect(bindings[i].getAddress()).andStubReturn(addresses[i]);
          EasyMock.expect(bindings[i].getQueue()).andStubReturn(queues[i]);
          EasyMock.expect(queues[i].getName()).andStubReturn(queueNames[i]);
-         queues[i].setFlowController((FlowController)EasyMock.anyObject());
          EasyMock.expect(queues[i].getPersistenceID()).andStubReturn(i + 1);
          EasyMock.replay(bindings[i], queues[i]);
          dests.add(addresses[i]);
@@ -439,7 +432,6 @@
          EasyMock.expect(bindings[i].getAddress()).andStubReturn(addresses[i]);
          EasyMock.expect(bindings[i].getQueue()).andStubReturn(queues[i]);
          EasyMock.expect(queues[i].getName()).andStubReturn(queueNames[i]);
-         queues[i].setFlowController((FlowController)EasyMock.anyObject());
          EasyMock.expect(queues[i].getPersistenceID()).andStubReturn(i + 1);
          EasyMock.replay(bindings[i], queues[i]);
          dests.add(addresses[i]);
@@ -502,7 +494,6 @@
          EasyMock.expect(bindings[i].getAddress()).andStubReturn(addresses[i]);
          EasyMock.expect(bindings[i].getQueue()).andStubReturn(queues[i]);
          EasyMock.expect(queues[i].getName()).andStubReturn(queueNames[i]);
-         queues[i].setFlowController((FlowController)EasyMock.anyObject());
          EasyMock.expect(queues[i].getPersistenceID()).andStubReturn(i + 1);
          EasyMock.replay(bindings[i], queues[i]);
          dests.add(addresses[i]);
@@ -525,11 +516,6 @@
       EasyMock.verify(pm, qf, pgm, pgstore);
 
       assertTrue(postOffice.isStarted());
-      for (int i = 0; i < 100; i++)
-      {
-         FlowController flowController = postOffice.getFlowController(addresses[i]);
-         assertNotNull(flowController);
-      }
    }
 
    public void testListDestinations() throws Exception
@@ -564,7 +550,6 @@
          EasyMock.expect(bindings[i].getAddress()).andStubReturn(addresses[i]);
          EasyMock.expect(bindings[i].getQueue()).andStubReturn(queues[i]);
          EasyMock.expect(queues[i].getName()).andStubReturn(queueNames[i]);
-         queues[i].setFlowController((FlowController)EasyMock.anyObject());
          EasyMock.expect(queues[i].getPersistenceID()).andStubReturn(i + 1);
          EasyMock.replay(bindings[i], queues[i]);
          dests.add(addresses[i]);
@@ -793,7 +778,6 @@
       EasyMock.replay(pm, qf);
       postOffice.start();
       assertTrue(postOffice.addDestination(address, true));
-      assertNotNull(postOffice.getFlowController(address));
       assertTrue(postOffice.containsDestination(address));
       EasyMock.verify(pm, qf);
    }
@@ -825,9 +809,6 @@
       assertTrue(postOffice.addDestination(address, true));
       assertTrue(postOffice.addDestination(address2, true));
       assertTrue(postOffice.addDestination(address3, true));
-      assertNotNull(postOffice.getFlowController(address));
-      assertNotNull(postOffice.getFlowController(address2));
-      assertNotNull(postOffice.getFlowController(address3));
       assertTrue(postOffice.containsDestination(address));
       assertTrue(postOffice.containsDestination(address2));
       assertTrue(postOffice.containsDestination(address3));
@@ -858,7 +839,6 @@
       assertTrue(postOffice.addDestination(address, true));
       assertTrue(postOffice.containsDestination(address));
       postOffice.removeDestination(address, true);
-      assertNull(postOffice.getFlowController(address));
       assertFalse(postOffice.containsDestination(address));
       EasyMock.verify(pm, qf);
    }
@@ -892,19 +872,13 @@
       assertTrue(postOffice.addDestination(address, true));
       assertTrue(postOffice.addDestination(address2, true));
       assertTrue(postOffice.addDestination(address3, true));
-      assertNotNull(postOffice.getFlowController(address));
-      assertNotNull(postOffice.getFlowController(address2));
-      assertNotNull(postOffice.getFlowController(address3));
       assertTrue(postOffice.containsDestination(address));
       assertTrue(postOffice.containsDestination(address2));
       assertTrue(postOffice.containsDestination(address3));
       postOffice.removeDestination(address, true);
       postOffice.removeDestination(address3, true);
-      assertNull(postOffice.getFlowController(address));
       assertFalse(postOffice.containsDestination(address));
-      assertNotNull(postOffice.getFlowController(address2));
       assertTrue(postOffice.containsDestination(address2));
-      assertNull(postOffice.getFlowController(address3));
       assertFalse(postOffice.containsDestination(address3));
       EasyMock.verify(pm, qf);
    }
@@ -929,7 +903,6 @@
                       (ResourceManager)EasyMock.anyObject());
       EasyMock.expect(qf.createQueue(-1, queueName, filter, true, false)).andReturn(queue);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setFlowController((FlowController)EasyMock.anyObject());
       pm.addBinding((Binding)EasyMock.anyObject());
       EasyMock.replay(pm, qf, queue);
       postOffice.start();
@@ -965,11 +938,8 @@
       EasyMock.expect(qf.createQueue(-1, queueName2, filter, true, false)).andReturn(queue2);
       EasyMock.expect(qf.createQueue(-1, queueName3, filter, true, false)).andReturn(queue3);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
-      queue2.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
-      queue3.setFlowController((FlowController)EasyMock.anyObject());
       pm.addBinding((Binding)EasyMock.anyObject());
       pm.addBinding((Binding)EasyMock.anyObject());
       pm.addBinding((Binding)EasyMock.anyObject());
@@ -1005,7 +975,6 @@
                       (ResourceManager)EasyMock.anyObject());
       EasyMock.expect(qf.createQueue(-1, queueName, filter, false, false)).andReturn(queue);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.replay(pm, qf, queue);
       postOffice.start();
 
@@ -1040,11 +1009,8 @@
       EasyMock.expect(qf.createQueue(-1, queueName2, filter, false, false)).andReturn(queue2);
       EasyMock.expect(qf.createQueue(-1, queueName3, filter, false, false)).andReturn(queue3);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
-      queue2.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
-      queue3.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.replay(pm, qf, queue, queue2, queue3);
       postOffice.start();
 
@@ -1078,7 +1044,6 @@
       EasyMock.expect(qf.createQueue(-1, queueName, filter, true, false)).andReturn(queue);
       EasyMock.expect(qf.createQueue(-1, queueName, filter, true, false)).andReturn(queue);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setFlowController((FlowController)EasyMock.anyObject());
       pm.addBinding((Binding)EasyMock.anyObject());
       EasyMock.replay(pm, qf, queue);
       postOffice.start();
@@ -1117,11 +1082,9 @@
                       (ResourceManager)EasyMock.anyObject());
       EasyMock.expect(qf.createQueue(-1, queueName, filter, true, false)).andReturn(queue);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setFlowController((FlowController)EasyMock.anyObject());
       pm.addBinding((Binding)EasyMock.anyObject());
       EasyMock.expect(queue.isDurable()).andStubReturn(true);
       pm.deleteBinding((Binding)EasyMock.anyObject());
-      queue.setFlowController(null);
       EasyMock.replay(pm, qf, queue);
       postOffice.start();
 
@@ -1157,20 +1120,15 @@
       EasyMock.expect(qf.createQueue(-1, queueName2, filter, true, false)).andReturn(queue2);
       EasyMock.expect(qf.createQueue(-1, queueName3, filter, true, false)).andReturn(queue3);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
-      queue2.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
-      queue3.setFlowController((FlowController)EasyMock.anyObject());
       pm.addBinding((Binding)EasyMock.anyObject());
       pm.addBinding((Binding)EasyMock.anyObject());
       pm.addBinding((Binding)EasyMock.anyObject());
       pm.deleteBinding((Binding)EasyMock.anyObject());
       pm.deleteBinding((Binding)EasyMock.anyObject());
       EasyMock.expect(queue.isDurable()).andStubReturn(true);
-      queue.setFlowController(null);
       EasyMock.expect(queue3.isDurable()).andStubReturn(true);
-      queue3.setFlowController(null);
       EasyMock.replay(pm, qf, queue, queue2, queue3);
       postOffice.start();
 
@@ -1205,9 +1163,7 @@
                       (ResourceManager)EasyMock.anyObject());
       EasyMock.expect(qf.createQueue(-1, queueName, filter, false, false)).andReturn(queue);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue.isDurable()).andStubReturn(false);
-      queue.setFlowController(null);
       EasyMock.replay(pm, qf, queue);
       postOffice.start();
 
@@ -1243,16 +1199,11 @@
       EasyMock.expect(qf.createQueue(-1, queueName2, filter, false, false)).andReturn(queue2);
       EasyMock.expect(qf.createQueue(-1, queueName3, filter, false, false)).andReturn(queue3);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
-      queue2.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
-      queue3.setFlowController((FlowController)EasyMock.anyObject());
 
       EasyMock.expect(queue.isDurable()).andStubReturn(false);
-      queue.setFlowController(null);
       EasyMock.expect(queue3.isDurable()).andStubReturn(false);
-      queue3.setFlowController(null);
       EasyMock.replay(pm, qf, queue, queue2, queue3);
       postOffice.start();
 
@@ -1385,7 +1336,6 @@
       EasyMock.expect(qf.createQueue(-1, queueName, null, false, false)).andReturn(queue);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
       EasyMock.expect(queue.getFilter()).andStubReturn(null);
-      queue.setFlowController((FlowController)EasyMock.anyObject());
 
       EasyMock.expect(message.createReference(queue)).andReturn(messageReference);
 
@@ -1434,7 +1384,6 @@
       EasyMock.expect(qf.createQueue(-1, queueName, null, false, false)).andReturn(queue);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
       EasyMock.expect(queue.getFilter()).andStubReturn(null);
-      queue.setFlowController((FlowController)EasyMock.anyObject());
 
       EasyMock.replay(pm, pgm, qf, message, queue);
 
@@ -1474,7 +1423,6 @@
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
       EasyMock.expect(queue.getFilter()).andStubReturn(filter);
       EasyMock.expect(filter.match(message)).andReturn(true);
-      queue.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(message.createReference(queue)).andReturn(messageReference);
       EasyMock.replay(pm, qf, message, queue, messageReference, filter);
       postOffice.start();
@@ -1512,7 +1460,6 @@
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
       EasyMock.expect(queue.getFilter()).andStubReturn(filter);
       EasyMock.expect(filter.match(message)).andReturn(false);
-      queue.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.replay(pm, qf, message, queue, messageReference, filter);
       postOffice.start();
       postOffice.addBinding(address, queueName, filter, false, false);
@@ -1558,9 +1505,6 @@
       EasyMock.expect(queue2.getFilter()).andStubReturn(null);
       EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
       EasyMock.expect(queue3.getFilter()).andStubReturn(null);
-      queue.setFlowController((FlowController)EasyMock.anyObject());
-      queue2.setFlowController((FlowController)EasyMock.anyObject());
-      queue3.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(message.createReference(queue)).andReturn(messageReference);
       EasyMock.expect(message.createReference(queue2)).andReturn(messageReference2);
       EasyMock.expect(message.createReference(queue3)).andReturn(messageReference3);
@@ -1618,9 +1562,6 @@
       EasyMock.expect(queue3.getFilter()).andStubReturn(filter2);
       EasyMock.expect(filter.match(message)).andReturn(false);
       EasyMock.expect(filter2.match(message)).andReturn(true);
-      queue.setFlowController((FlowController)EasyMock.anyObject());
-      queue2.setFlowController((FlowController)EasyMock.anyObject());
-      queue3.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(message.createReference(queue2)).andReturn(messageReference2);
       EasyMock.expect(message.createReference(queue3)).andReturn(messageReference3);
       EasyMock.replay(pm, qf, message, queue, queue2, queue3, messageReference, filter, filter2);

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java	2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java	2008-10-30 20:13:20 UTC (rev 5228)
@@ -81,7 +81,6 @@
       EasyMock.expect(message3.getDestination()).andStubReturn(address3);
       EasyMock.expect(qf.createQueue(-1, queueName, null, false, false)).andReturn(queue);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setFlowController(null);
       EasyMock.expect(queue.getFilter()).andStubReturn(null);
       EasyMock.expect(pgm.addSize(message)).andStubReturn(1000l);
       //this bit is the test itself, if the reference is created for each queue thenwe know that they have been routed via all 3 queues
@@ -93,9 +92,6 @@
       assertTrue(postOffice.addDestination(address, true));
       assertTrue(postOffice.addDestination(address2, true));
       assertTrue(postOffice.addDestination(address3, true));
-      assertNotNull(postOffice.getFlowController(address));
-      assertNotNull(postOffice.getFlowController(address2));
-      assertNotNull(postOffice.getFlowController(address3));
       assertTrue(postOffice.containsDestination(address));
       assertTrue(postOffice.containsDestination(address2));
       assertTrue(postOffice.containsDestination(address3));
@@ -147,12 +143,10 @@
       EasyMock.expect(qf.createQueue(-1, queueName2, null, false, false)).andReturn(queue2);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
       EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
-      queue.setFlowController(null);
-      queue2.setFlowController(null);
       EasyMock.expect(queue.getFilter()).andStubReturn(null);
       EasyMock.expect(queue2.getFilter()).andStubReturn(null);
       EasyMock.expect(pgm.addSize(message)).andStubReturn(1000l);
-      //this bit is the test itself, if the reference is created for each queue thenwe know that they have been routed via all 3 queues
+      //this bit is the test itself, if the reference is created for each queue then we know that they have been routed via all 3 queues
       EasyMock.expect(message.createReference(queue)).andReturn(messageReference);
       EasyMock.expect(message2.createReference(queue2)).andReturn(messageReference2);
       EasyMock.expect(message3.createReference(queue)).andReturn(messageReference3);




More information about the jboss-cvs-commits mailing list