[hornetq-commits] JBoss hornetq SVN: r8998 - in trunk: src/config/jboss-6/clustered and 32 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Mar 29 09:35:31 EDT 2010


Author: timfox
Date: 2010-03-29 09:35:27 -0400 (Mon, 29 Mar 2010)
New Revision: 8998

Added:
   trunk/tests/src/org/hornetq/tests/integration/remoting/BatchDelayTest.java
Removed:
   trunk/src/main/org/hornetq/core/protocol/aardvark/impl/
Modified:
   trunk/src/config/common/schema/hornetq-configuration.xsd
   trunk/src/config/jboss-6/clustered/hornetq-configuration.xml
   trunk/src/config/jboss-6/non-clustered/hornetq-configuration.xml
   trunk/src/config/jboss-as/clustered/hornetq-configuration.xml
   trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml
   trunk/src/config/stand-alone/clustered/hornetq-configuration.xml
   trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml
   trunk/src/config/trunk/clustered/hornetq-configuration.xml
   trunk/src/config/trunk/non-clustered/hornetq-configuration.xml
   trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
   trunk/src/main/org/hornetq/api/core/client/HornetQClient.java
   trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
   trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
   trunk/src/main/org/hornetq/core/config/Configuration.java
   trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
   trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
   trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
   trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
   trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
   trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java
   trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
   trunk/src/main/org/hornetq/jms/client/HornetQMessageProducer.java
   trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
   trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
   trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
   trunk/src/main/org/hornetq/spi/core/remoting/Connection.java
   trunk/tests/config/ConfigurationTest-full-config.xml
   trunk/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
   trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
   trunk/tests/src/org/hornetq/tests/integration/client/SimpleSendMultipleQueues.java
   trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java
   trunk/tests/src/org/hornetq/tests/integration/remoting/SynchronousCloseTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
Log:
batching optimisation

Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd	2010-03-29 13:35:27 UTC (rev 8998)
@@ -291,7 +291,7 @@
 		<xsd:sequence>
 			<xsd:element maxOccurs="1" minOccurs="1" name="queue-name" type="xsd:IDREF">
 			</xsd:element>
-			<xsd:element maxOccurs="1" minOccurs="1" name="forwarding-address" type="xsd:string">
+			<xsd:element maxOccurs="1" minOccurs="0" name="forwarding-address" type="xsd:string">
 			</xsd:element>
             <xsd:element maxOccurs="1" minOccurs="0" name="filter">
                 <xsd:complexType>

Modified: trunk/src/config/jboss-6/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-6/clustered/hornetq-configuration.xml	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/config/jboss-6/clustered/hornetq-configuration.xml	2010-03-29 13:35:27 UTC (rev 8998)
@@ -22,6 +22,13 @@
          <param key="host"  value="${jboss.bind.address:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
       </connector>
+      
+      <connector name="netty-throughput">
+         <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+         <param key="host"  value="${jboss.bind.address:localhost}"/>
+         <param key="port"  value="${hornetq.remoting.netty.port:5455}"/>
+         <param key="batch-delay" value="50"/>
+      </connector>
 
       <connector name="in-vm">
          <factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
@@ -35,6 +42,13 @@
          <param key="host"  value="${jboss.bind.address:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
       </acceptor>
+      
+      <acceptor name="netty-throughput">
+         <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+         <param key="host"  value="${jboss.bind.address:localhost}"/>
+         <param key="port"  value="${hornetq.remoting.netty.port:5455}"/>
+         <param key="batch-delay" value="50"/>
+      </acceptor>
 
       <acceptor name="in-vm">
         <factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>

Modified: trunk/src/config/jboss-6/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-6/non-clustered/hornetq-configuration.xml	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/config/jboss-6/non-clustered/hornetq-configuration.xml	2010-03-29 13:35:27 UTC (rev 8998)
@@ -20,6 +20,13 @@
          <param key="host"  value="${jboss.bind.address:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
       </connector>
+      
+      <connector name="netty-throughput">
+         <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+         <param key="host"  value="${jboss.bind.address:localhost}"/>
+         <param key="port"  value="${hornetq.remoting.netty.port:5455}"/>
+         <param key="batch-delay" value="50"/>
+      </connector>
 
       <connector name="in-vm">
          <factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
@@ -33,6 +40,13 @@
          <param key="host"  value="${jboss.bind.address:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
       </acceptor>
+      
+      <acceptor name="netty-throughput">
+         <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+         <param key="host"  value="${jboss.bind.address:localhost}"/>
+         <param key="port"  value="${hornetq.remoting.netty.port:5455}"/>
+         <param key="batch-delay" value="50"/>
+      </acceptor>
 
       <acceptor name="in-vm">
         <factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>

Modified: trunk/src/config/jboss-as/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as/clustered/hornetq-configuration.xml	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/config/jboss-as/clustered/hornetq-configuration.xml	2010-03-29 13:35:27 UTC (rev 8998)
@@ -22,6 +22,13 @@
          <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
       </connector>
+      
+      <connector name="netty-throughput">
+         <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+         <param key="host"  value="${jboss.bind.address:localhost}"/>
+         <param key="port"  value="${hornetq.remoting.netty.port:5455}"/>
+         <param key="batch-delay" value="50"/>
+      </connector>
 
       <connector name="in-vm">
          <factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
@@ -35,6 +42,13 @@
          <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
       </acceptor>
+      
+      <acceptor name="netty-throughput">
+         <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+         <param key="host"  value="${jboss.bind.address:localhost}"/>
+         <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
+         <param key="batch-delay" value="50"/>
+      </acceptor>
 
       <acceptor name="in-vm">
         <factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>

Modified: trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml	2010-03-29 13:35:27 UTC (rev 8998)
@@ -20,6 +20,13 @@
          <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
       </connector>
+      
+      <connector name="netty-throughput">
+         <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+         <param key="host"  value="${jboss.bind.address:localhost}"/>
+         <param key="port"  value="${hornetq.remoting.netty.port:5455}"/>
+         <param key="batch-delay" value="50"/>
+      </connector>
 
       <connector name="in-vm">
          <factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
@@ -33,6 +40,13 @@
          <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
       </acceptor>
+      
+      <acceptor name="netty-throughput">
+         <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+         <param key="host"  value="${jboss.bind.address:localhost}"/>
+         <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
+         <param key="batch-delay" value="50"/>
+      </acceptor>
 
       <acceptor name="in-vm">
         <factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>

Modified: trunk/src/config/stand-alone/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/clustered/hornetq-configuration.xml	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/config/stand-alone/clustered/hornetq-configuration.xml	2010-03-29 13:35:27 UTC (rev 8998)
@@ -20,6 +20,13 @@
          <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
       </connector>
+      
+      <connector name="netty-throughput">
+         <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+         <param key="host"  value="${jboss.bind.address:localhost}"/>
+         <param key="port"  value="${hornetq.remoting.netty.port:5455}"/>
+         <param key="batch-delay" value="50"/>
+      </connector>
    </connectors>
 
    <acceptors>
@@ -28,6 +35,13 @@
          <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
       </acceptor>
+      
+      <acceptor name="netty-throughput">
+         <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+         <param key="host"  value="${jboss.bind.address:localhost}"/>
+         <param key="port"  value="${hornetq.remoting.netty.port:5455}"/>
+         <param key="batch-delay" value="50"/>
+      </acceptor>
    </acceptors>
 
    <broadcast-groups>

Modified: trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml	2010-03-29 13:35:27 UTC (rev 8998)
@@ -18,6 +18,13 @@
          <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
       </connector>
+      
+      <connector name="netty-throughput">
+         <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+         <param key="host"  value="${jboss.bind.address:localhost}"/>
+         <param key="port"  value="${hornetq.remoting.netty.port:5455}"/>
+         <param key="batch-delay" value="50"/>
+      </connector>
    </connectors>
 
    <acceptors>
@@ -26,6 +33,13 @@
          <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
       </acceptor>
+      
+      <acceptor name="netty-throughput">
+         <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+         <param key="host"  value="${jboss.bind.address:localhost}"/>
+         <param key="port"  value="${hornetq.remoting.netty.port:5455}"/>
+         <param key="batch-delay" value="50"/>
+      </acceptor>
    </acceptors>
 
    <security-settings>

Modified: trunk/src/config/trunk/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/trunk/clustered/hornetq-configuration.xml	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/config/trunk/clustered/hornetq-configuration.xml	2010-03-29 13:35:27 UTC (rev 8998)
@@ -12,6 +12,13 @@
 			<param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
 			<param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
 		</connector>
+		
+		<connector name="netty-throughput">
+            <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+            <param key="host"  value="${jboss.bind.address:localhost}"/>
+            <param key="port"  value="${hornetq.remoting.netty.port:5455}"/>
+            <param key="batch-delay" value="50"/>
+      </connector>
 	</connectors>
 	
 	<acceptors>
@@ -20,6 +27,13 @@
 			<param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
 			<param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
 		</acceptor>
+		
+		<acceptor name="netty-throughput">
+            <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+            <param key="host"  value="${jboss.bind.address:localhost}"/>
+            <param key="port"  value="${hornetq.remoting.netty.port:5455}"/>
+            <param key="batch-delay" value="50"/>
+      </acceptor>
 	</acceptors>
 	
 	<broadcast-groups>

Modified: trunk/src/config/trunk/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/trunk/non-clustered/hornetq-configuration.xml	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/config/trunk/non-clustered/hornetq-configuration.xml	2010-03-29 13:35:27 UTC (rev 8998)
@@ -10,6 +10,13 @@
 			<param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
 			<param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
 		</connector>
+		
+		<connector name="netty-throughput">
+            <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+            <param key="host"  value="${jboss.bind.address:localhost}"/>
+            <param key="port"  value="${hornetq.remoting.netty.port:5455}"/>
+            <param key="batch-delay" value="50"/>
+      </connector>
 	</connectors>
 	
 	<acceptors>
@@ -18,6 +25,13 @@
 			<param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
 			<param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
 		</acceptor>
+		
+        <acceptor name="netty-throughput">
+            <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+            <param key="host"  value="${jboss.bind.address:localhost}"/>
+            <param key="port"  value="${hornetq.remoting.netty.port:5455}"/>
+            <param key="batch-delay" value="50"/>
+        </acceptor>
 	</acceptors>
 	
 	<security-settings>

Modified: trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -724,7 +724,7 @@
     * @param size initial size of messages created through this factory.
     */
    void setInitialMessagePacketSize(int size);
-
+   
    /**
     * Adds an interceptor which will be executed <em>after packets are received from the server</em>.
     * 

Modified: trunk/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/HornetQClient.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/api/core/client/HornetQClient.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -89,7 +89,7 @@
    public static final boolean DEFAULT_CACHE_LARGE_MESSAGE_CLIENT = false;
 
    public static final int DEFAULT_INITIAL_MESSAGE_PACKET_SIZE = 1500;
-
+   
    /**
     * Creates a ClientSessionFactory using all the defaults.
     *

Modified: trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -13,18 +13,15 @@
 
 package org.hornetq.api.core.management;
 
+import java.util.Set;
+
 import javax.management.MBeanOperationInfo;
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
 import org.hornetq.core.security.Role;
-import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.settings.impl.AddressSettings;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * A HornetQServerControl is used to manage HornetQ servers.
  */

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -274,7 +274,7 @@
          }
          else
          {
-            channel.send(packet);
+            channel.sendBatched(packet);
          }
       }
 

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -153,7 +153,7 @@
    private static ScheduledExecutorService globalScheduledThreadPool;
 
    private String groupID;
-
+   
    private static synchronized ExecutorService getGlobalThreadPool()
    {
       if (ClientSessionFactoryImpl.globalThreadPool == null)
@@ -244,7 +244,7 @@
                                                             retryInterval,
                                                             retryIntervalMultiplier,
                                                             maxRetryInterval,
-                                                            reconnectAttempts,
+                                                            reconnectAttempts,                                                        
                                                             threadPool,
                                                             scheduledThreadPool,
                                                             interceptors);
@@ -778,7 +778,7 @@
       checkWrite();
       initialMessagePacketSize = size;
    }
-
+   
    public ClientSession createSession(final String username,
                                       final String password,
                                       final boolean xa,
@@ -984,7 +984,7 @@
                                                                       retryInterval,
                                                                       retryIntervalMultiplier,
                                                                       maxRetryInterval,
-                                                                      reconnectAttempts,
+                                                                      reconnectAttempts,                                                                    
                                                                       threadPool,
                                                                       scheduledThreadPool,
                                                                       interceptors);

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -154,7 +154,7 @@
 
    private final int minLargeMessageSize;
 
-   private final int initialMessagePacketSize;
+   private volatile int initialMessagePacketSize;
 
    private final boolean cacheLargeMessageClient;
 
@@ -989,7 +989,7 @@
 
                   HornetQBuffer buffer = packet.encode(channel.getConnection());
 
-                  conn.write(buffer, false);
+                  conn.write(buffer, false, false);
                }
 
                resetCreditManager = true;
@@ -1040,6 +1040,16 @@
          }
       }
    }
+   
+   
+   
+   public void setPacketSize(final int packetSize)
+   {
+      if (packetSize > this.initialMessagePacketSize)
+      {
+         this.initialMessagePacketSize = (int)(packetSize * 1.2);
+      }
+   }
 
    private void sendPacketWithoutLock(final Packet packet)
    {
@@ -1049,7 +1059,7 @@
 
       HornetQBuffer buffer = packet.encode(channel.getConnection());
 
-      conn.write(buffer, false);
+      conn.write(buffer, false, false);
    }
 
    public void workDone()

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -81,4 +81,6 @@
    ClientProducerCreditManager getProducerCreditManager();
    
    void setAddress(Message message, SimpleString address);
+   
+   void setPacketSize(int packetSize);
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -545,4 +545,9 @@
    {
       session.setAddress(message, address);
    }
+
+   public void setPacketSize(int packetSize)
+   {
+      session.setPacketSize(packetSize);
+   }
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -75,7 +75,7 @@
    private static final long serialVersionUID = 2512460695662741413L;
 
    private static final Logger log = Logger.getLogger(FailoverManagerImpl.class);
-
+   
    // debug
 
    private static Map<TransportConfiguration, Set<CoreRemotingConnection>> debugConns;
@@ -199,7 +199,7 @@
    private volatile boolean exitLoop;
 
    private final List<Interceptor> interceptors;
-
+   
    // Static
    // ---------------------------------------------------------------------------------------
 
@@ -383,7 +383,7 @@
                   {
                      theConnection.destroy();
                   }
-                  
+
                   if (e.getCode() == HornetQException.UNBLOCKED)
                   {
                      // This means the thread was blocked on create session and failover unblocked it
@@ -440,6 +440,7 @@
             }
             catch (Throwable t)
             {
+               t.printStackTrace();
                if (lock != null)
                {
                   lock.unlock();
@@ -552,7 +553,7 @@
    private void failoverOrReconnect(final Object connectionID, final HornetQException me)
    {
       Set<ClientSessionInternal> sessionsToClose = null;
-      
+
       synchronized (failoverLock)
       {
          if (connection == null || connection.getID() != connectionID)
@@ -561,7 +562,7 @@
             // over then a async connection exception or disconnect
             // came in for one of the already exitLoop connections, so we return true - we don't want to call the
             // listeners again
-            
+
             return;
          }
 
@@ -609,7 +610,7 @@
          {
             attemptReconnect = reconnectAttempts != 0;
          }
-         
+
          if (attemptFailover || attemptReconnect)
          {
             lockChannel1();
@@ -664,7 +665,7 @@
             {
             }
 
-            cancelPinger();
+            cancelScheduledTasks();
 
             connector = null;
 
@@ -853,7 +854,7 @@
       }
    }
 
-   private void cancelPinger()
+   private void cancelScheduledTasks()
    {
       if (pingerFuture != null)
       {
@@ -871,7 +872,7 @@
    {
       if (connection != null && sessions.size() == 0)
       {
-         cancelPinger();
+         cancelScheduledTasks();
 
          try
          {
@@ -905,7 +906,7 @@
          Connection tc = null;
 
          try
-         {            
+         {
             DelegatingBufferHandler handler = new DelegatingBufferHandler();
 
             connector = connectorFactory.createConnector(transportParams,
@@ -918,7 +919,7 @@
             if (connector != null)
             {
                connector.start();
-            
+
                tc = connector.createConnection();
 
                if (tc == null)
@@ -986,7 +987,7 @@
             {
                pingRunnable = new PingRunnable();
 
-               pingerFuture = scheduledThreadPool.scheduleWithFixedDelay(new ActualScheduled(pingRunnable),
+               pingerFuture = scheduledThreadPool.scheduleWithFixedDelay(new ActualScheduledPinger(pingRunnable),
                                                                          0,
                                                                          clientFailureCheckPeriod,
                                                                          TimeUnit.MILLISECONDS);
@@ -1113,11 +1114,11 @@
       }
    }
 
-   private static final class ActualScheduled implements Runnable
+   private static final class ActualScheduledPinger implements Runnable
    {
       private final WeakReference<PingRunnable> pingRunnable;
 
-      ActualScheduled(final PingRunnable runnable)
+      ActualScheduledPinger(final PingRunnable runnable)
       {
          pingRunnable = new WeakReference<PingRunnable>(runnable);
       }
@@ -1134,6 +1135,47 @@
 
    }
 
+   private static final class ActualScheduledBatchFlusher implements Runnable
+   {
+      private final WeakReference<BatchFlushRunnable> batchFlushRunnable;
+
+      ActualScheduledBatchFlusher(final BatchFlushRunnable runnable)
+      {
+         batchFlushRunnable = new WeakReference<BatchFlushRunnable>(runnable);
+      }
+
+      public void run()
+      {
+         BatchFlushRunnable runnable = batchFlushRunnable.get();
+
+         if (runnable != null)
+         {
+            runnable.run();
+         }
+      }
+
+   }
+
+   private final class BatchFlushRunnable implements Runnable
+   {
+      private boolean cancelled;
+
+      public synchronized void run()
+      {
+         if (cancelled)
+         {
+            return;
+         }         
+         //log.info("calling check flush on client");
+         connection.getTransportConnection().checkFlushBatchBuffer();
+      }
+
+      public synchronized void cancel()
+      {
+         cancelled = true;
+      }
+   }
+
    private final class PingRunnable implements Runnable
    {
       private boolean cancelled;
@@ -1186,7 +1228,7 @@
          Channel channel0 = connection.getChannel(0, -1);
 
          channel0.send(ping);
-         
+
          connection.flush();
       }
 

Modified: trunk/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/Configuration.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/config/Configuration.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -183,7 +183,7 @@
     * Sets whether this server is manageable using JMX or not.
     */
    void setJMXManagementEnabled(boolean enabled);
-
+   
    /**
     * Returns the domain used by JMX MBeans (provided JMX management is enabled).
     * <br>

Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -154,7 +154,7 @@
    public static final int DEFAULT_ID_CACHE_SIZE = 2000;
 
    public static final boolean DEFAULT_PERSIST_ID_CACHE = true;
-
+   
    public static final boolean DEFAULT_CLUSTER_DUPLICATE_DETECTION = true;
 
    public static final boolean DEFAULT_CLUSTER_FORWARD_WHEN_NO_CONSUMERS = false;
@@ -220,7 +220,7 @@
    protected String logDelegateFactoryClassName = ConfigurationImpl.DEFAULT_LOG_DELEGATE_FACTORY_CLASS_NAME;
 
    protected List<String> interceptorClassNames = new ArrayList<String>();
-
+   
    protected Map<String, TransportConfiguration> connectorConfigs = new HashMap<String, TransportConfiguration>();
 
    protected Set<TransportConfiguration> acceptorConfigs = new HashSet<TransportConfiguration>();
@@ -486,7 +486,7 @@
    {
       this.backupConnectorName = backupConnectorName;
    }
-
+   
    public GroupingHandlerConfiguration getGroupingHandlerConfiguration()
    {
       return groupingHandlerConfiguration;
@@ -1280,6 +1280,7 @@
       {
          return false;
       }
+
       return true;
    }
 

Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -16,7 +16,6 @@
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.net.URL;
-import java.util.Properties;
 
 import org.hornetq.core.deployers.impl.FileConfigurationParser;
 import org.hornetq.core.logging.Logger;

Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -183,7 +183,7 @@
    public int getHeadersAndPropertiesEncodeSize()
    {
       return DataConstants.SIZE_LONG + // Message ID
-             /* address */SimpleString.sizeofString(address) +
+             /* address */SimpleString.sizeofNullableString(address) +
              DataConstants./* Type */SIZE_BYTE +
              DataConstants./* Durable */SIZE_BOOLEAN +
              DataConstants./* Expiration */SIZE_LONG +
@@ -191,11 +191,12 @@
              DataConstants./* Priority */SIZE_BYTE +
              /* PropertySize and Properties */properties.getEncodeSize();
    }
+   
 
    public void encodeHeadersAndProperties(final HornetQBuffer buffer)
    {
       buffer.writeLong(messageID);
-      buffer.writeSimpleString(address);
+      buffer.writeNullableSimpleString(address);
       buffer.writeByte(type);
       buffer.writeBoolean(durable);
       buffer.writeLong(expiration);
@@ -207,7 +208,7 @@
    public void decodeHeadersAndProperties(final HornetQBuffer buffer)
    {
       messageID = buffer.readLong();
-      address = buffer.readSimpleString();
+      address = buffer.readNullableSimpleString();
       type = buffer.readByte();
       durable = buffer.readBoolean();
       expiration = buffer.readLong();
@@ -215,7 +216,7 @@
       priority = buffer.readByte();
       properties.decode(buffer);
    }
-
+   
    public HornetQBuffer getBodyBuffer()
    {
       if (bodyBuffer == null)

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -689,6 +689,7 @@
             return;
          }
       }
+      
       runnable.run();
    }
 

Modified: trunk/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -88,8 +88,7 @@
    Object getTransferLock();
    
    /**
-    * 
-    * @return the maximum batch size used when batching writes
+    * Called periodically to flush any data in the batch buffer
     */
-   int getMaxBatchSize();
+   void checkFlushBatchBuffer();
 }

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -74,7 +74,7 @@
    private CommandConfirmationHandler commandConfirmationHandler;
 
    private volatile boolean transferring;
-
+   
    public ChannelImpl(final CoreRemotingConnection connection, final long id, final int confWindowSize)
    {
       this.connection = connection;
@@ -161,7 +161,7 @@
          packet.setChannelID(id);
 
          HornetQBuffer buffer = packet.encode(connection);
-
+         
          lock.lock();
 
          try
@@ -189,7 +189,7 @@
                resendCache.add(packet);
             }
 
-            connection.getTransportConnection().write(buffer, flush);
+            connection.getTransportConnection().write(buffer, flush, batch);
          }
          finally
          {
@@ -198,9 +198,6 @@
       }
    }
 
-   public void checkFlushBatchBuffer()
-   {      
-   }
    
    public Packet sendBlocking(final Packet packet) throws HornetQException
    {
@@ -245,7 +242,7 @@
                resendCache.add(packet);
             }
 
-            connection.getTransportConnection().write(buffer);
+            connection.getTransportConnection().write(buffer, false, false);
 
             long toWait = connection.getBlockingCallTimeout();
 
@@ -385,8 +382,6 @@
    // Needs to be synchronized since can be called by remoting service timer thread too for timeout flush
    public synchronized void flushConfirmations()
    {
-      checkFlushBatchBuffer();
-
       if (resendCache != null && receivedBytes != 0)
       {
          receivedBytes = 0;
@@ -480,7 +475,7 @@
    {
       final HornetQBuffer buffer = packet.encode(connection);
 
-      connection.getTransportConnection().write(buffer);
+      connection.getTransportConnection().write(buffer, false, false);
    }
 
    private void clearUpTo(final int lastReceivedCommandID)

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -57,12 +57,11 @@
    {
       final Configuration config = server.getConfiguration();
 
-      CoreRemotingConnection rc = new RemotingConnectionImpl(connection,
-                                                             interceptors,
-                                                             config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
-                                                                                                                .getExecutor()
-                                                                                                       : null,
-                                                             connection.getBatchingBufferSize());
+      final CoreRemotingConnection rc = new RemotingConnectionImpl(connection,
+                                                                   interceptors,
+                                                                   config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
+                                                                                                                      .getExecutor()
+                                                                                                             : null);
 
       Channel channel1 = rc.getChannel(1, -1);
 

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -23,7 +23,6 @@
 import java.util.concurrent.Executor;
 
 import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
 import org.hornetq.core.logging.Logger;
@@ -34,7 +33,6 @@
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
-import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.SimpleIDGenerator;
 
 /**
@@ -89,8 +87,6 @@
 
    private final Executor executor;
    
-   private final int maxBatchSize;
-   
    private volatile boolean executing;
 
    // Constructors
@@ -103,7 +99,7 @@
                                  final long blockingCallTimeout,
                                  final List<Interceptor> interceptors)
    {
-      this(transportConnection, blockingCallTimeout, interceptors, true, null, -1);
+      this(transportConnection, blockingCallTimeout, interceptors, true, null);
    }
 
    /*
@@ -111,19 +107,17 @@
     */
    public RemotingConnectionImpl(final Connection transportConnection,
                                  final List<Interceptor> interceptors,
-                                 final Executor executor,
-                                 final int maxBatchSize)
+                                 final Executor executor)
 
    {
-      this(transportConnection, -1, interceptors, false, executor, maxBatchSize);
+      this(transportConnection, -1, interceptors, false, executor);
    }
 
    private RemotingConnectionImpl(final Connection transportConnection,
                                   final long blockingCallTimeout,
                                   final List<Interceptor> interceptors,
                                   final boolean client,
-                                  final Executor executor,
-                                  final int maxBatchSize)
+                                  final Executor executor)
 
    {
       this.transportConnection = transportConnection;
@@ -135,8 +129,6 @@
       this.client = client;
 
       this.executor = executor;
-      
-      this.maxBatchSize = maxBatchSize;
    }
 
    // RemotingConnection implementation
@@ -372,9 +364,9 @@
       }
    }
    
-   public int getMaxBatchSize()
+   public void checkFlushBatchBuffer()
    {
-      return maxBatchSize;
+      transportConnection.checkFlushBatchBuffer();
    }
 
    // Buffer Handler implementation

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -108,7 +108,7 @@
       this.frameDecoder = new StompFrameDecoder();
       this.executor = server.getExecutorFactory().getExecutor();
    }
-
+   
    // ProtocolManager implementation --------------------------------
 
    public ConnectionEntry createConnectionEntry(final Connection connection)
@@ -261,7 +261,7 @@
          try
          {
             HornetQBuffer buffer = frame.toHornetQBuffer();
-            connection.getTransportConnection().write(buffer, false);
+            connection.getTransportConnection().write(buffer, false, false);
          }
          catch (Exception e)
          {

Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -45,6 +45,8 @@
    private final int serverID;
 
    private final Executor executor;
+   
+   private volatile boolean closing;
 
    public InVMConnection(final int serverID,
                          final BufferHandler handler,
@@ -73,8 +75,6 @@
       listener.connectionCreated(this, ProtocolType.CORE);
    }
 
-   private volatile boolean closing;
-
    public void close()
    {
       if (closing)
@@ -104,13 +104,17 @@
    {
       return id;
    }
+   
+   public void checkFlushBatchBuffer()
+   {
+   }
 
    public void write(final HornetQBuffer buffer)
    {
-      write(buffer, false);
+      write(buffer, false, false);
    }
-
-   public void write(final HornetQBuffer buffer, final boolean flush)
+   
+   public void write(final HornetQBuffer buffer, final boolean flush, final boolean batch)
    {
       final HornetQBuffer copied = buffer.copy(0, buffer.capacity());
 

Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -31,7 +31,6 @@
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.aardvark.impl.AardvarkProtocolManagerFactory;
 import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
 import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
 import org.hornetq.core.remoting.server.RemotingService;
@@ -89,17 +88,16 @@
    private final ScheduledExecutorService scheduledThreadPool;
 
    private FailureCheckAndFlushThread failureCheckAndFlushThread;
-   
-   private Map<ProtocolType, ProtocolManager> protocolMap = 
-      new ConcurrentHashMap<ProtocolType, ProtocolManager>();
-   
+
+   private Map<ProtocolType, ProtocolManager> protocolMap = new ConcurrentHashMap<ProtocolType, ProtocolManager>();
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
    public RemotingServiceImpl(final Configuration config,
                               final HornetQServer server,
-                              final ManagementService managementService,                            
+                              final ManagementService managementService,
                               final ScheduledExecutorService scheduledThreadPool)
    {
       transportConfigs = config.getAcceptorConfigurations();
@@ -121,13 +119,15 @@
       }
 
       this.config = config;
+
       this.managementService = managementService;
-      
+
       this.scheduledThreadPool = scheduledThreadPool;
-      
-      this.protocolMap.put(ProtocolType.CORE, new CoreProtocolManagerFactory().createProtocolManager(server, interceptors));
-      this.protocolMap.put(ProtocolType.STOMP, new StompProtocolManagerFactory().createProtocolManager(server, interceptors));
-      this.protocolMap.put(ProtocolType.AARDVARK, new AardvarkProtocolManagerFactory().createProtocolManager(server, interceptors));
+
+      this.protocolMap.put(ProtocolType.CORE, new CoreProtocolManagerFactory().createProtocolManager(server,
+                                                                                                     interceptors));
+      this.protocolMap.put(ProtocolType.STOMP, new StompProtocolManagerFactory().createProtocolManager(server,
+                                                                                                       interceptors));
    }
 
    // RemotingService implementation -------------------------------
@@ -138,17 +138,18 @@
       {
          return;
       }
-      
-      //The remoting service maintains it's own thread pool for handling remoting traffic
-      //If OIO each connection will have it's own thread
-      //If NIO these are capped at nio-remoting-threads which defaults to num cores * 3
-      //This needs to be a different thread pool to the main thread pool especially for OIO where we may need
-      //to support many hundreds of connections, but the main thread pool must be kept small for better performance
-      
-      ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-remoting-threads" + System.identityHashCode(this), false);
 
+      // The remoting service maintains it's own thread pool for handling remoting traffic
+      // If OIO each connection will have it's own thread
+      // If NIO these are capped at nio-remoting-threads which defaults to num cores * 3
+      // This needs to be a different thread pool to the main thread pool especially for OIO where we may need
+      // to support many hundreds of connections, but the main thread pool must be kept small for better performance
+
+      ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-remoting-threads" + System.identityHashCode(this),
+                                                        false);
+
       threadPool = Executors.newCachedThreadPool(tFactory);
-      
+
       ClassLoader loader = Thread.currentThread().getContextClassLoader();
 
       for (TransportConfiguration info : transportConfigs)
@@ -174,13 +175,15 @@
                   continue;
                }
             }
-            
-            String protocolString = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME, TransportConstants.DEFAULT_PROTOCOL, info.getParams());
 
+            String protocolString = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME,
+                                                                          TransportConstants.DEFAULT_PROTOCOL,
+                                                                          info.getParams());
+
             ProtocolType protocol = ProtocolType.valueOf(protocolString.toUpperCase());
-            
+
             ProtocolManager manager = protocolMap.get(protocol);
-            
+
             Acceptor acceptor = factory.createAcceptor(info.getParams(),
                                                        new DelegatingBufferHandler(manager),
                                                        manager,
@@ -193,7 +196,7 @@
             if (managementService != null)
             {
                acceptor.setNotificationService(managementService);
-               
+
                managementService.registerAcceptor(acceptor, info);
             }
          }
@@ -208,7 +211,7 @@
          a.start();
       }
 
-      //This thread checks connections that need to be closed, and also flushes confirmations
+      // This thread checks connections that need to be closed, and also flushes confirmations
       failureCheckAndFlushThread = new FailureCheckAndFlushThread(RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL);
 
       failureCheckAndFlushThread.start();
@@ -275,11 +278,11 @@
       {
          managementService.unregisterAcceptors();
       }
-      
+
       threadPool.shutdown();
-      
+
       boolean ok = threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS);
-      
+
       if (!ok)
       {
          log.warn("Timed out waiting for remoting thread pool to terminate");
@@ -331,23 +334,23 @@
    {
       return protocolMap.get(protocol);
    }
-   
+
    public void connectionCreated(final Connection connection, final ProtocolType protocol)
    {
       if (server == null)
       {
          throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
       }
-      
+
       ProtocolManager pmgr = this.getProtocolManager(protocol);
-      
+
       if (pmgr == null)
       {
          throw new IllegalArgumentException("Unknown protocol " + protocol);
       }
 
       ConnectionEntry entry = pmgr.createConnectionEntry(connection);
-      
+
       connections.put(connection.getID(), entry);
 
       if (config.isBackup())
@@ -410,12 +413,12 @@
    private final class DelegatingBufferHandler implements BufferHandler
    {
       private ProtocolManager manager;
-      
+
       DelegatingBufferHandler(final ProtocolManager manager)
       {
          this.manager = manager;
       }
-      
+
       public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
       {
          ConnectionEntry conn = connections.get(connectionID);
@@ -470,9 +473,9 @@
             for (ConnectionEntry entry : connections.values())
             {
                RemotingConnection conn = entry.connection;
-               
+
                boolean flush = true;
-               
+
                if (entry.ttl != -1)
                {
                   if (now >= entry.lastCheck + entry.ttl)
@@ -480,7 +483,7 @@
                      if (!conn.checkDataReceived())
                      {
                         idsToRemove.add(conn.getID());
-                        
+
                         flush = false;
                      }
                      else
@@ -489,9 +492,9 @@
                      }
                   }
                }
-               
+
                if (flush)
-               {                                   
+               {
                   conn.flush();
                }
             }

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -787,7 +787,7 @@
                                      0,
                                      1.0d,
                                      0,
-                                     1,
+                                     1,                                     
                                      threadPool,
                                      scheduledPool,
                                      null);

Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -24,6 +24,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import javax.net.ssl.SSLContext;
@@ -132,19 +133,25 @@
 
    private final int nioRemotingThreads;
 
-   private final int batchingBufferSize;
-
    private final HttpKeepAliveRunnable httpKeepAliveRunnable;
 
    private final ConcurrentMap<Object, Connection> connections = new ConcurrentHashMap<Object, Connection>();
 
    private final Executor threadPool;
+   
+   private final ScheduledExecutorService scheduledThreadPool;
 
    private NotificationService notificationService;
 
    private VirtualExecutorService bossExecutor;
 
    private boolean paused;
+   
+   private BatchFlusher flusher;
+   
+   private ScheduledFuture<?> batchFlusherFuture;
+   
+   private final long batchDelay;
 
    public NettyAcceptor(final Map<String, Object> configuration,
                         final BufferHandler handler,
@@ -196,10 +203,6 @@
                                                               -1,
                                                               configuration);
 
-      batchingBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.BATCHING_BUFFER_SIZE_PROPNAME,
-                                                              TransportConstants.DEFAULT_BATCHING_BUFFER_SIZE,
-                                                              configuration);
-
       useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME,
                                                        TransportConstants.DEFAULT_USE_INVM,
                                                        configuration);
@@ -248,6 +251,12 @@
                                                                 configuration);
 
       this.threadPool = threadPool;
+      
+      this.scheduledThreadPool = scheduledThreadPool;
+      
+      batchDelay = ConfigurationHelper.getLongProperty(TransportConstants.BATCH_DELAY,
+                                                       TransportConstants.DEFAULT_BATCH_DELAY,
+                                                       configuration);
    }
 
    public synchronized void start() throws Exception
@@ -395,6 +404,13 @@
          Notification notification = new Notification(null, NotificationType.ACCEPTOR_STARTED, props);
          notificationService.sendNotification(notification);
       }
+      
+      if (batchDelay > 0)
+      {
+         flusher = new BatchFlusher();
+         
+         batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher, batchDelay, batchDelay, TimeUnit.MILLISECONDS);
+      }
 
       NettyAcceptor.log.info("Started Netty Acceptor version " + Version.ID);
    }
@@ -424,6 +440,17 @@
       {
          return;
       }
+      
+      if (batchFlusherFuture != null)
+      {
+         batchFlusherFuture.cancel(false);
+         
+         flusher.cancel();
+         
+         flusher = null;
+         
+         batchFlusherFuture = null;
+      }
 
       serverChannelGroup.close().awaitUninterruptibly();
 
@@ -549,7 +576,7 @@
       @Override
       public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
       {
-         new NettyConnection(e.getChannel(), new Listener(), httpEnabled ? -1 : batchingBufferSize);
+         new NettyConnection(e.getChannel(), new Listener(), !httpEnabled && batchDelay > 0);
 
          SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
          if (sslHandler != null)
@@ -610,4 +637,25 @@
 
       }
    }
+   
+   private class BatchFlusher implements Runnable
+   {
+      private boolean cancelled;
+
+      public synchronized void run()
+      {
+         if (!cancelled)
+         {
+            for (Connection connection : connections.values())
+            {
+               connection.checkFlushBatchBuffer();
+            }
+         }
+      }
+
+      public synchronized void cancel()
+      {
+         cancelled = true;
+      }
+   }
 }

Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -14,6 +14,7 @@
 package org.hornetq.integration.transports.netty;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.spi.core.protocol.ProtocolType;
@@ -28,7 +29,7 @@
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * buhnaflagilibrn
+ * 
  * @version <tt>$Revision$</tt>
  */
 public class NettyConnection implements Connection
@@ -37,6 +38,8 @@
 
    private static final Logger log = Logger.getLogger(NettyConnection.class);
 
+   private static final int BATCHING_BUFFER_SIZE = 8192;
+
    // Attributes ----------------------------------------------------
 
    private final Channel channel;
@@ -44,22 +47,25 @@
    private boolean closed;
 
    private final ConnectionLifeCycleListener listener;
+
+   private final boolean batchingEnabled;
    
-   private final int batchingBufferSize;
+   private HornetQBuffer batchBuffer;
+   
+   private final Object writeLock = new Object();
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public NettyConnection(final Channel channel, final ConnectionLifeCycleListener listener,
-                          final int batchingBufferSize)
+   public NettyConnection(final Channel channel, final ConnectionLifeCycleListener listener, boolean batchingEnabled)
    {
       this.channel = channel;
 
       this.listener = listener;
+
+      this.batchingEnabled = batchingEnabled;
       
-      this.batchingBufferSize = batchingBufferSize;
-
       listener.connectionCreated(this, ProtocolType.CORE);
    }
 
@@ -114,34 +120,90 @@
       return channel.getId();
    }
 
+   // This is called periodically to flush the batch buffer
+   public void checkFlushBatchBuffer()
+   {
+      synchronized (writeLock)
+      {
+         if (!batchingEnabled)
+         {
+            return;
+         }
+
+         if (batchBuffer != null && batchBuffer.readable())
+         {
+            channel.write(batchBuffer.channelBuffer());
+
+            batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+         }
+      }
+   }
+
    public void write(final HornetQBuffer buffer)
    {
-      write(buffer, false);
+      write(buffer, false, false);
    }
 
-   public void write(final HornetQBuffer buffer, final boolean flush)
+   public void write(HornetQBuffer buffer, final boolean flush, final boolean batched)
    {
-      ChannelFuture future = channel.write(buffer.channelBuffer());
-
-      if (flush)
+      synchronized (writeLock)
       {
-         while (true)
+         if (batchBuffer == null && batchingEnabled && batched && !flush)
          {
-            try
+            // Lazily create batch buffer
+
+            batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+         }
+
+         if (batchBuffer != null)
+         {
+            batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
+
+            if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush)
             {
-               boolean ok = future.await(10000);
+               // If the batch buffer is full or it's flush param or not batched then flush the buffer
 
-               if (!ok)
-               {
-                  NettyConnection.log.warn("Timed out waiting for packet to be flushed");
-               }
+               buffer = batchBuffer;
+            }
+            else
+            {
+               return;
+            }
 
-               break;
+            if (!batched || flush)
+            {
+               batchBuffer = null;
             }
-            catch (InterruptedException ignore)
+            else
             {
+               // Create a new buffer
+
+               batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
             }
          }
+
+         ChannelFuture future = channel.write(buffer.channelBuffer());
+
+         if (flush)
+         {
+            while (true)
+            {
+               try
+               {
+                  boolean ok = future.await(10000);
+
+                  if (!ok)
+                  {
+                     NettyConnection.log.warn("Timed out waiting for packet to be flushed");
+                  }
+
+                  break;
+               }
+               catch (InterruptedException ignore)
+               {
+               }
+            }
+         }
       }
    }
 
@@ -149,11 +211,6 @@
    {
       return channel.getRemoteAddress().toString();
    }
-   
-   public int getBatchingBufferSize()
-   {
-      return batchingBufferSize;
-   }
 
    // Public --------------------------------------------------------
 

Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -25,6 +25,7 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import javax.net.ssl.SSLContext;
@@ -129,6 +130,8 @@
 
    private final int tcpReceiveBufferSize;
 
+   private final long batchDelay;
+
    private final ConcurrentMap<Object, Connection> connections = new ConcurrentHashMap<Object, Connection>();
 
    private final String servletPath;
@@ -140,9 +143,11 @@
    private final ScheduledExecutorService scheduledThreadPool;
 
    private final Executor closeExecutor;
+   
+   private BatchFlusher flusher;
+   
+   private ScheduledFuture<?> batchFlusherFuture;
 
-   private final int batchingBufferSize;
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -240,15 +245,15 @@
                                                                 TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE,
                                                                 configuration);
 
+      batchDelay = ConfigurationHelper.getLongProperty(TransportConstants.BATCH_DELAY,
+                                                       TransportConstants.DEFAULT_BATCH_DELAY,
+                                                       configuration);
+
       this.closeExecutor = closeExecutor;
 
       virtualExecutor = new VirtualExecutorService(threadPool);
 
       this.scheduledThreadPool = scheduledThreadPool;
-
-      batchingBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.BATCHING_BUFFER_SIZE_PROPNAME,
-                                                              TransportConstants.DEFAULT_BATCHING_BUFFER_SIZE,
-                                                              configuration);
    }
 
    public synchronized void start()
@@ -363,6 +368,14 @@
             return pipeline;
          }
       });
+
+      if (batchDelay > 0)
+      {
+         flusher = new BatchFlusher();
+         
+         batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher, batchDelay, batchDelay, TimeUnit.MILLISECONDS);
+      }
+
       if (!Version.ID.equals(VersionLoader.getVersion().getNettyVersion()))
       {
          NettyConnector.log.warn("Unexpected Netty Version was expecting " + VersionLoader.getVersion()
@@ -372,13 +385,24 @@
       }
       NettyConnector.log.debug("Started Netty Connector version " + Version.ID);
    }
-
+   
    public synchronized void close()
    {
       if (channelFactory == null)
       {
          return;
       }
+      
+      if (batchFlusherFuture != null)
+      {
+         batchFlusherFuture.cancel(false);
+         
+         flusher.cancel();
+         
+         flusher = null;
+         
+         batchFlusherFuture = null;
+      }
 
       bootstrap = null;
       channelGroup.close().awaitUninterruptibly();
@@ -447,7 +471,7 @@
             ch.getPipeline().get(HornetQChannelHandler.class).active = true;
          }
 
-         NettyConnection conn = new NettyConnection(ch, new Listener(), batchingBufferSize);
+         NettyConnection conn = new NettyConnection(ch, new Listener(), !httpEnabled && batchDelay > 0);
 
          return conn;
       }
@@ -673,5 +697,26 @@
          });
       }
    }
+   
+   private class BatchFlusher implements Runnable
+   {
+      private boolean cancelled;
 
+      public synchronized void run()
+      {
+         if (!cancelled)
+         {
+            for (Connection connection : connections.values())
+            {
+               connection.checkFlushBatchBuffer();
+            }
+         }
+      }
+
+      public synchronized void cancel()
+      {
+         cancelled = true;
+      }
+   }
+
 }

Modified: trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -70,8 +70,8 @@
    
    public static final String NIO_REMOTING_THREADS_PROPNAME = "nio-remoting-threads";
    
-   public static final String BATCHING_BUFFER_SIZE_PROPNAME = "batching-buffer-size";
-
+   public static final String BATCH_DELAY = "batch-delay";
+   
    public static final boolean DEFAULT_SSL_ENABLED = false;
 
    public static final boolean DEFAULT_USE_NIO_SERVER = false;
@@ -105,8 +105,6 @@
 
    public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 32768;
    
-   public static final int DEFAULT_BATCHING_BUFFER_SIZE = 8192;
-
    public static final boolean DEFAULT_HTTP_ENABLED = false;
 
    public static final long DEFAULT_HTTP_CLIENT_IDLE_TIME = 500;
@@ -120,6 +118,8 @@
    public static final boolean DEFAULT_HTTP_REQUIRES_SESSION_ID = false;
 
    public static final String DEFAULT_SERVLET_PATH = "/messaging/HornetQServlet";
+   
+   public static final long DEFAULT_BATCH_DELAY = 0;
 
    public static final Set<String> ALLOWABLE_CONNECTOR_KEYS;
 
@@ -145,7 +145,7 @@
       allowableAcceptorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME);
       allowableAcceptorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME);
       allowableAcceptorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
-      allowableAcceptorKeys.add(TransportConstants.BATCHING_BUFFER_SIZE_PROPNAME);
+      allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY);
 
       ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);
 
@@ -166,7 +166,7 @@
       allowableConnectorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME);
       allowableConnectorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME);
       allowableConnectorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
-      allowableConnectorKeys.add(TransportConstants.BATCHING_BUFFER_SIZE_PROPNAME);
+      allowableConnectorKeys.add(TransportConstants.BATCH_DELAY);
 
       ALLOWABLE_CONNECTOR_KEYS = Collections.unmodifiableSet(allowableConnectorKeys);
    }

Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -568,7 +568,7 @@
    {
       return sessionFactory.getGroupID();
    }
-
+   
    public void close()
    {
       sessionFactory.close();

Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessageProducer.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessageProducer.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessageProducer.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -409,11 +409,9 @@
 
          UUID uid = UUIDGenerator.getInstance().generateUUID();
          
-         //msg.getCoreMessage().putBytesProperty(HornetQMessage.HORNETQ_MESSAGE_ID, uid.asBytes());
-         
          msg.getCoreMessage().putStringProperty(HornetQMessage.JMSMESSAGEID_HEADER_NAME, new SimpleString("ID:" + uid.toString()));
          
-         msg.resetMessageID(null);
+         msg.resetMessageID(null); 
       }
 
       if (foreign)

Modified: trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -110,7 +110,7 @@
    private boolean failoverOnServerShutdown = HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
 
    private String groupID = null;
-
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -540,9 +540,8 @@
    public void setDiscoveryGroupName(String groupName)
    {
       this.discoveryGroupName = groupName;
-
    }
-
+   
    // Encoding Support Implementation --------------------------------------------------------------
    
    /* (non-Javadoc)

Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -47,7 +47,7 @@
  *
  *
  */
-public class JMSServerConfigParserImpl implements JMSServerConfigParser 
+public class JMSServerConfigParserImpl implements JMSServerConfigParser
 {
    private static final Logger log = Logger.getLogger(JMSServerConfigParserImpl.class);
 
@@ -307,7 +307,7 @@
                                                                       Validators.GT_ZERO);
       String groupid = XMLConfigurationUtil.getString(e, "group-id", null, Validators.NO_CHECK);
       List<String> jndiBindings = new ArrayList<String>();
-      List<Pair<String, String>> connectorNames = new ArrayList<Pair<String,String>>();
+      List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
       String discoveryGroupName = null;
 
       NodeList children = node.getChildNodes();
@@ -347,11 +347,9 @@
                   {
                      backupConnectorName = backupNode.getNodeValue();
                   }
-                  
-                  
+
                   connectorNames.add(new Pair<String, String>(connectorName, backupConnectorName));
-                  
-                  
+
                }
             }
          }
@@ -368,8 +366,7 @@
 
       if (discoveryGroupName != null)
       {
-         cfConfig = new ConnectionFactoryConfigurationImpl(name,
-                                                           strbindings);
+         cfConfig = new ConnectionFactoryConfigurationImpl(name, strbindings);
          cfConfig.setInitialWaitTimeout(discoveryInitialWaitTimeout);
          cfConfig.setDiscoveryGroupName(discoveryGroupName);
       }
@@ -378,7 +375,7 @@
          cfConfig = new ConnectionFactoryConfigurationImpl(name, strbindings);
          cfConfig.setConnectorNames(connectorNames);
       }
-      
+
       cfConfig.setInitialWaitTimeout(discoveryInitialWaitTimeout);
       cfConfig.setClientID(clientID);
       cfConfig.setClientFailureCheckPeriod(clientFailureCheckPeriod);
@@ -411,7 +408,6 @@
       return cfConfig;
    }
 
-
    /**
     * hook for integration layers 
     * @param topicName
@@ -432,9 +428,9 @@
     * @return
     */
    protected JMSQueueConfiguration newQueue(final String queueName,
-                                         final String selectorString,
-                                         final boolean durable,
-                                         final String[] jndiArray)
+                                            final String selectorString,
+                                            final boolean durable,
+                                            final String[] jndiArray)
    {
       return new JMSQueueConfigurationImpl(queueName, selectorString, durable, jndiArray);
    }

Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -129,16 +129,16 @@
    public JMSServerManagerImpl(final HornetQServer server) throws Exception
    {
       this.server = server;
-      
+
       this.coreConfig = server.getConfiguration();
-      
+
       configFileName = null;
    }
 
    public JMSServerManagerImpl(final HornetQServer server, final String configFileName) throws Exception
    {
       this.server = server;
-      
+
       this.coreConfig = server.getConfiguration();
 
       this.configFileName = configFileName;
@@ -147,7 +147,7 @@
    public JMSServerManagerImpl(final HornetQServer server, final JMSConfiguration configuration) throws Exception
    {
       this.server = server;
-      
+
       this.coreConfig = server.getConfiguration();
 
       configFileName = null;
@@ -187,7 +187,7 @@
 
                if (configFileName != null)
                {
-                  jmsDeployer.setConfigFileNames(new String[]{configFileName});
+                  jmsDeployer.setConfigFileNames(new String[] { configFileName });
                }
 
                jmsDeployer.start();
@@ -224,16 +224,15 @@
       deploymentManager = new FileDeploymentManager(server.getConfiguration().getFileDeployerScanPeriod());
 
       server.registerActivateCallback(this);
-      
+
       server.start();
 
-
       if (server.getReplicationEndpoint() != null)
       {
          createJournal();
          storage.installReplication(server.getReplicationEndpoint());
       }
-      
+
       started = true;
    }
 
@@ -253,17 +252,17 @@
       {
          deploymentManager.stop();
       }
-      
+
       // Storage could be null on a shared store backup server before initialization
       if (storage != null)
       {
          storage.stop();
       }
-      
+
       unbindJNDI(queueJNDI);
 
       unbindJNDI(topicJNDI);
-      
+
       unbindJNDI(connectionFactoryJNDI);
 
       for (String connectionFactory : new HashSet<String>(connectionFactories.keySet()))
@@ -284,10 +283,10 @@
       if (jmsManagementService != null)
       {
          jmsManagementService.unregisterJMSServer();
-   
+
          jmsManagementService.stop();
       }
-      
+
       server.stop();
 
       started = false;
@@ -339,7 +338,10 @@
       return server.getVersion().getFullVersion();
    }
 
-   public synchronized boolean createQueue(final String queueName, final String selectorString, final boolean durable, final String... jndi) throws Exception
+   public synchronized boolean createQueue(final String queueName,
+                                           final String selectorString,
+                                           final boolean durable,
+                                           final String... jndi) throws Exception
    {
       checkInitialised();
 
@@ -409,7 +411,6 @@
       return getJNDIList(connectionFactoryJNDI, factoryName);
    }
 
-
    public boolean addQueueToJndi(final String queueName, final String jndiBinding) throws Exception
    {
       checkInitialised();
@@ -540,14 +541,12 @@
       return true;
    }
 
-
    public synchronized boolean destroyQueue(final String name) throws Exception
    {
       checkInitialised();
 
       removeFromJNDI(queueJNDI, name);
 
-
       queues.remove(name);
       queueJNDI.remove(name);
 
@@ -571,8 +570,8 @@
 
       jmsManagementService.unregisterTopic(name);
 
-      AddressControl addressControl = (AddressControl) server.getManagementService()
-            .getResource(ResourceNames.CORE_ADDRESS + HornetQDestination.createTopicAddressFromName(name));
+      AddressControl addressControl = (AddressControl)server.getManagementService()
+                                                            .getResource(ResourceNames.CORE_ADDRESS + HornetQDestination.createTopicAddressFromName(name));
       if (addressControl != null)
       {
          for (String queueName : addressControl.getQueueNames())
@@ -581,9 +580,9 @@
             if (binding == null)
             {
                log.warn("Queue " + queueName +
-                     " doesn't exist on the topic " +
-                     name +
-                     ". It was deleted manually probably.");
+                        " doesn't exist on the topic " +
+                        name +
+                        ". It was deleted manually probably.");
                continue;
             }
 
@@ -734,7 +733,9 @@
       HornetQConnectionFactory cf = connectionFactories.get(name);
       if (cf == null)
       {
-         ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name, discoveryAddress, discoveryPort);
+         ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name,
+                                                                                               discoveryAddress,
+                                                                                               discoveryPort);
          configuration.setClientID(clientID);
          configuration.setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
          configuration.setClientFailureCheckPeriod(clientFailureCheckPeriod);
@@ -777,12 +778,13 @@
       HornetQConnectionFactory cf = connectionFactories.get(name);
       if (cf == null)
       {
-         ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name, discoveryAddress, discoveryPort);
+         ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name,
+                                                                                               discoveryAddress,
+                                                                                               discoveryPort);
          createConnectionFactory(configuration, jndiBindings);
       }
    }
 
-
    public synchronized void createConnectionFactory(final String name,
                                                     final String clientID,
                                                     final String discoveryAddress,
@@ -794,13 +796,16 @@
       HornetQConnectionFactory cf = connectionFactories.get(name);
       if (cf == null)
       {
-         ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name, discoveryAddress, discoveryPort);
+         ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name,
+                                                                                               discoveryAddress,
+                                                                                               discoveryPort);
          configuration.setClientID(clientID);
          createConnectionFactory(configuration, jndiBindings);
       }
    }
 
-   public synchronized void createConnectionFactory(final ConnectionFactoryConfiguration cfConfig, String... jndiBindings) throws Exception
+   public synchronized void createConnectionFactory(final ConnectionFactoryConfiguration cfConfig,
+                                                    String... jndiBindings) throws Exception
    {
       internalCreateCF(cfConfig);
       storage.storeConnectionFactory(new PersistedConnectionFactory(cfConfig));
@@ -849,7 +854,7 @@
       HornetQConnectionFactory cf = connectionFactories.get(name);
       if (cf == null)
       {
-         cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(discoveryAddress, discoveryPort);
+         cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactory(discoveryAddress, discoveryPort);
          cf.setClientID(clientID);
          cf.setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
          cf.setClientFailureCheckPeriod(clientFailureCheckPeriod);
@@ -919,7 +924,7 @@
       HornetQConnectionFactory cf = connectionFactories.get(name);
       if (cf == null)
       {
-         cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(connectorConfigs);
+         cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactory(connectorConfigs);
          cf.setClientID(clientID);
          cf.setClientFailureCheckPeriod(clientFailureCheckPeriod);
          cf.setConnectionTTL(connectionTTL);
@@ -952,7 +957,6 @@
       return cf;
    }
 
-
    private String[] getJNDIList(final Map<String, List<String>> map, final String name)
    {
       List<String> result = map.get(name);
@@ -981,9 +985,9 @@
       }
 
       server.getHornetQServerControl().deployQueue(hqQueue.getAddress(),
-            hqQueue.getAddress(),
-            coreFilterString,
-            durable);
+                                                   hqQueue.getAddress(),
+                                                   coreFilterString,
+                                                   durable);
 
       queues.put(queueName, hqQueue);
 
@@ -1008,9 +1012,9 @@
       // does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
       // subscriptions - core has no notion of a topic
       server.getHornetQServerControl().deployQueue(hqTopic.getAddress(),
-            hqTopic.getAddress(),
-            JMSServerManagerImpl.REJECT_FILTER,
-            true);
+                                                   hqTopic.getAddress(),
+                                                   JMSServerManagerImpl.REJECT_FILTER,
+                                                   true);
 
       topics.put(topicName, hqTopic);
 
@@ -1024,7 +1028,8 @@
     * @throws HornetQException
     * @throws Exception
     */
-   private HornetQConnectionFactory internalCreateCF(final ConnectionFactoryConfiguration cfConfig) throws HornetQException, Exception
+   private HornetQConnectionFactory internalCreateCF(final ConnectionFactoryConfiguration cfConfig) throws HornetQException,
+                                                                                                   Exception
    {
 
       List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs = lookupConnectors(cfConfig);
@@ -1034,71 +1039,71 @@
       if (cfConfig.getDiscoveryAddress() != null)
       {
          cf = internalCreateConnectionFactory(cfConfig.getName(),
-               cfConfig.getDiscoveryAddress(),
-               cfConfig.getDiscoveryPort(),
-               cfConfig.getClientID(),
-               cfConfig.getDiscoveryRefreshTimeout(),
-               cfConfig.getClientFailureCheckPeriod(),
-               cfConfig.getConnectionTTL(),
-               cfConfig.getCallTimeout(),
-               cfConfig.isCacheLargeMessagesClient(),
-               cfConfig.getMinLargeMessageSize(),
-               cfConfig.getConsumerWindowSize(),
-               cfConfig.getConsumerMaxRate(),
-               cfConfig.getConfirmationWindowSize(),
-               cfConfig.getProducerWindowSize(),
-               cfConfig.getProducerMaxRate(),
-               cfConfig.isBlockOnAcknowledge(),
-               cfConfig.isBlockOnDurableSend(),
-               cfConfig.isBlockOnNonDurableSend(),
-               cfConfig.isAutoGroup(),
-               cfConfig.isPreAcknowledge(),
-               cfConfig.getLoadBalancingPolicyClassName(),
-               cfConfig.getTransactionBatchSize(),
-               cfConfig.getDupsOKBatchSize(),
-               cfConfig.getInitialWaitTimeout(),
-               cfConfig.isUseGlobalPools(),
-               cfConfig.getScheduledThreadPoolMaxSize(),
-               cfConfig.getThreadPoolMaxSize(),
-               cfConfig.getRetryInterval(),
-               cfConfig.getRetryIntervalMultiplier(),
-               cfConfig.getMaxRetryInterval(),
-               cfConfig.getReconnectAttempts(),
-               cfConfig.isFailoverOnServerShutdown(),
-               cfConfig.getGroupID());
+                                              cfConfig.getDiscoveryAddress(),
+                                              cfConfig.getDiscoveryPort(),
+                                              cfConfig.getClientID(),
+                                              cfConfig.getDiscoveryRefreshTimeout(),
+                                              cfConfig.getClientFailureCheckPeriod(),
+                                              cfConfig.getConnectionTTL(),
+                                              cfConfig.getCallTimeout(),
+                                              cfConfig.isCacheLargeMessagesClient(),
+                                              cfConfig.getMinLargeMessageSize(),
+                                              cfConfig.getConsumerWindowSize(),
+                                              cfConfig.getConsumerMaxRate(),
+                                              cfConfig.getConfirmationWindowSize(),
+                                              cfConfig.getProducerWindowSize(),
+                                              cfConfig.getProducerMaxRate(),
+                                              cfConfig.isBlockOnAcknowledge(),
+                                              cfConfig.isBlockOnDurableSend(),
+                                              cfConfig.isBlockOnNonDurableSend(),
+                                              cfConfig.isAutoGroup(),
+                                              cfConfig.isPreAcknowledge(),
+                                              cfConfig.getLoadBalancingPolicyClassName(),
+                                              cfConfig.getTransactionBatchSize(),
+                                              cfConfig.getDupsOKBatchSize(),
+                                              cfConfig.getInitialWaitTimeout(),
+                                              cfConfig.isUseGlobalPools(),
+                                              cfConfig.getScheduledThreadPoolMaxSize(),
+                                              cfConfig.getThreadPoolMaxSize(),
+                                              cfConfig.getRetryInterval(),
+                                              cfConfig.getRetryIntervalMultiplier(),
+                                              cfConfig.getMaxRetryInterval(),
+                                              cfConfig.getReconnectAttempts(),
+                                              cfConfig.isFailoverOnServerShutdown(),
+                                              cfConfig.getGroupID());
       }
       else
       {
          cf = internalCreateConnectionFactory(cfConfig.getName(),
-               connectorConfigs,
-               cfConfig.getClientID(),
-               cfConfig.getClientFailureCheckPeriod(),
-               cfConfig.getConnectionTTL(),
-               cfConfig.getCallTimeout(),
-               cfConfig.isCacheLargeMessagesClient(),
-               cfConfig.getMinLargeMessageSize(),
-               cfConfig.getConsumerWindowSize(),
-               cfConfig.getConsumerMaxRate(),
-               cfConfig.getConfirmationWindowSize(),
-               cfConfig.getProducerWindowSize(),
-               cfConfig.getProducerMaxRate(),
-               cfConfig.isBlockOnAcknowledge(),
-               cfConfig.isBlockOnDurableSend(),
-               cfConfig.isBlockOnNonDurableSend(),
-               cfConfig.isAutoGroup(),
-               cfConfig.isPreAcknowledge(),
-               cfConfig.getLoadBalancingPolicyClassName(),
-               cfConfig.getTransactionBatchSize(),
-               cfConfig.getDupsOKBatchSize(),
-               cfConfig.isUseGlobalPools(),
-               cfConfig.getScheduledThreadPoolMaxSize(),
-               cfConfig.getThreadPoolMaxSize(),
-               cfConfig.getRetryInterval(),
-               cfConfig.getRetryIntervalMultiplier(),
-               cfConfig.getMaxRetryInterval(),
-               cfConfig.getReconnectAttempts(),
-               cfConfig.isFailoverOnServerShutdown(),
-               cfConfig.getGroupID());
+                                              connectorConfigs,
+                                              cfConfig.getClientID(),
+                                              cfConfig.getClientFailureCheckPeriod(),
+                                              cfConfig.getConnectionTTL(),
+                                              cfConfig.getCallTimeout(),
+                                              cfConfig.isCacheLargeMessagesClient(),
+                                              cfConfig.getMinLargeMessageSize(),
+                                              cfConfig.getConsumerWindowSize(),
+                                              cfConfig.getConsumerMaxRate(),
+                                              cfConfig.getConfirmationWindowSize(),
+                                              cfConfig.getProducerWindowSize(),
+                                              cfConfig.getProducerMaxRate(),
+                                              cfConfig.isBlockOnAcknowledge(),
+                                              cfConfig.isBlockOnDurableSend(),
+                                              cfConfig.isBlockOnNonDurableSend(),
+                                              cfConfig.isAutoGroup(),
+                                              cfConfig.isPreAcknowledge(),
+                                              cfConfig.getLoadBalancingPolicyClassName(),
+                                              cfConfig.getTransactionBatchSize(),
+                                              cfConfig.getDupsOKBatchSize(),
+                                              cfConfig.isUseGlobalPools(),
+                                              cfConfig.getScheduledThreadPoolMaxSize(),
+                                              cfConfig.getThreadPoolMaxSize(),
+                                              cfConfig.getRetryInterval(),
+                                              cfConfig.getRetryIntervalMultiplier(),
+                                              cfConfig.getMaxRetryInterval(),
+                                              cfConfig.getReconnectAttempts(),
+                                              cfConfig.isFailoverOnServerShutdown(),
+                                              cfConfig.getGroupID());
       }
       connectionFactories.put(cfConfig.getName(), cf);
 
@@ -1308,16 +1313,16 @@
 
          DiscoveryGroupConfiguration discoveryGroupConfiguration = null;
          discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations()
-               .get(cfConfig.getDiscoveryGroupName());
+                                                    .get(cfConfig.getDiscoveryGroupName());
 
          if (discoveryGroupConfiguration == null)
          {
             JMSServerManagerImpl.log.warn("There is no discovery group with name '" + cfConfig.getDiscoveryGroupName() +
-                  "' deployed.");
+                                          "' deployed.");
 
             throw new HornetQException(HornetQException.ILLEGAL_STATE,
-                  "There is no discovery group with name '" + cfConfig.getDiscoveryGroupName() +
-                        "' deployed.");
+                                       "There is no discovery group with name '" + cfConfig.getDiscoveryGroupName() +
+                                                "' deployed.");
          }
 
          cfConfig.setDiscoveryAddress(discoveryGroupConfiguration.getGroupAddress());
@@ -1395,10 +1400,9 @@
    private void initJournal() throws Exception
    {
       this.coreConfig = server.getConfiguration();
-      
 
       createJournal();
-      
+
       storage.load();
 
       List<PersistedConnectionFactory> cfs = storage.recoverConnectionFactories();
@@ -1414,9 +1418,7 @@
       {
          if (destination.getType() == PersistedType.Queue)
          {
-            internalCreateQueue(destination.getName(),
-                  destination.getSelector(),
-                  destination.isDurable());
+            internalCreateQueue(destination.getName(), destination.getSelector(), destination.isDurable());
          }
          else if (destination.getType() == PersistedType.Topic)
          {
@@ -1461,7 +1463,6 @@
             mapJNDI.put(record.getName(), jndiList);
          }
 
-
          for (String jndi : record.getJndi())
          {
             jndiList.add(jndi);
@@ -1479,7 +1480,9 @@
       {
          if (coreConfig.isPersistenceEnabled())
          {
-            storage = new JournalJMSStorageManagerImpl(new TimeAndCounterIDGenerator(), server.getConfiguration(), server.getReplicationManager());
+            storage = new JournalJMSStorageManagerImpl(new TimeAndCounterIDGenerator(),
+                                                       server.getConfiguration(),
+                                                       server.getReplicationManager());
          }
          else
          {
@@ -1518,8 +1521,9 @@
       return true;
    }
 
-
-   private synchronized boolean removeFromJNDI(final Map<String, List<String>> jndiMap, final String name, final String jndi) throws Exception
+   private synchronized boolean removeFromJNDI(final Map<String, List<String>> jndiMap,
+                                               final String name,
+                                               final String jndi) throws Exception
    {
       checkInitialised();
       List<String> jndiBindings = jndiMap.get(name);
@@ -1564,7 +1568,7 @@
             {
                JMSServerManagerImpl.log.warn("There is no connector with name '" + connectorName + "' deployed.");
                throw new HornetQException(HornetQException.ILLEGAL_STATE,
-                     "There is no connector with name '" + connectorName + "' deployed.");
+                                          "There is no connector with name '" + connectorName + "' deployed.");
             }
 
             TransportConfiguration backupConnector = null;
@@ -1576,10 +1580,10 @@
                if (backupConnector == null)
                {
                   JMSServerManagerImpl.log.warn("There is no backup connector with name '" + backupConnectorName +
-                        "' deployed.");
+                                                "' deployed.");
                   throw new HornetQException(HornetQException.ILLEGAL_STATE,
-                        "There is no backup connector with name '" + backupConnectorName +
-                              "' deployed.");
+                                             "There is no backup connector with name '" + backupConnectorName +
+                                                      "' deployed.");
                }
             }
 

Modified: trunk/src/main/org/hornetq/spi/core/remoting/Connection.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/Connection.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/src/main/org/hornetq/spi/core/remoting/Connection.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -14,7 +14,6 @@
 package org.hornetq.spi.core.remoting;
 
 import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQException;
 
 /**
  * The connection used by a channel to write data to.
@@ -40,19 +39,20 @@
    Object getID();
 
    /**
-    * writes the buffer to the wire.
+    * writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection.
     *
     * @param buffer the buffer to write
+    * @param flush  whether to flush the buffers onto the wire
+    * @param batched whether the packet is allowed to batched for better performance
     */
-   void write(HornetQBuffer buffer);
-
+   void write(HornetQBuffer buffer, boolean flush, boolean batched);
+   
    /**
-    * writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection.
+    * writes the buffer to the connection with no flushing or batching
     *
     * @param buffer the buffer to write
-    * @param flush  whether to flush the buffers onto the wire
     */
-   void write(HornetQBuffer buffer, boolean flush);
+   void write(HornetQBuffer buffer);
 
    /**
     * closes this connection.
@@ -67,10 +67,7 @@
    String getRemoteAddress();
    
    /**
-    * The batch size in bytes of the buffer for batching sends
-    * or -1 if the connection does not support batching
-    * 
-    * @return
+    * Called periodically to flush any data in the batch buffer
     */
-   int getBatchingBufferSize();
+   void checkFlushBatchBuffer();
 }
\ No newline at end of file

Modified: trunk/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-full-config.xml	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/tests/config/ConfigurationTest-full-config.xml	2010-03-29 13:35:27 UTC (rev 8998)
@@ -48,7 +48,7 @@
       <large-messages-directory>largemessagesdir</large-messages-directory>
       <memory-warning-threshold>95</memory-warning-threshold>
       <memory-measure-interval>54321</memory-measure-interval>
-      
+
       <remoting-interceptors>
          <class-name>org.hornetq.tests.unit.core.config.impl.TestInterceptor1</class-name>
          <class-name>org.hornetq.tests.unit.core.config.impl.TestInterceptor2</class-name>

Modified: trunk/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -68,7 +68,6 @@
       server.start();
 
       ClientSessionFactory sf = HornetQClient.createClientSessionFactory(new TransportConfiguration(connectorFactoryClassName));
-      // sf.setConsumerWindowSize(0);
 
       ClientSession session = sf.createSession(false, true, true);
 
@@ -109,8 +108,6 @@
       {
          ClientMessage message2 = consumer.receive();
 
-         // log.info("got message " + i);
-
          HornetQBuffer buffer = message2.getBodyBuffer();
 
          Assert.assertEquals("testINVMCoreClient", buffer.readString());

Modified: trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -90,7 +90,7 @@
    {
       testFlowControl(1000, 500, 10 * 1024, 1024, 1024, 0, 1, 1, 0, false);
    }
-
+   
    public void testFlowControlSingleConsumerSlowConsumer() throws Exception
    {
       testFlowControl(100, 500, 1024, 512, 512, 512, 1, 1, 10, false);
@@ -331,6 +331,8 @@
       ProducerFlowControlTest.log.info("rate is " + rate + " msgs / sec");
 
       session.close();
+      
+      sf.close();
 
       server.stop();
    }

Modified: trunk/tests/src/org/hornetq/tests/integration/client/SimpleSendMultipleQueues.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/SimpleSendMultipleQueues.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/tests/src/org/hornetq/tests/integration/client/SimpleSendMultipleQueues.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -62,7 +62,9 @@
 
          message.getBodyBuffer().writeString(body);
 
+       //  log.info("sending message");
          producer.send(message);
+        // log.info("sent message");
 
          ClientMessage received1 = consumer1.receive(1000);
          Assert.assertNotNull(received1);

Modified: trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -93,7 +93,7 @@
       }
 
       @Override
-      public void write(final HornetQBuffer buffer, final boolean flush)
+      public void write(final HornetQBuffer buffer, final boolean flush, final boolean batch)
       {
          InVMConnector.log.info("calling mock connection write");
          if (callback != null)
@@ -101,7 +101,7 @@
             callback.onWrite(buffer);
          }
 
-         super.write(buffer, flush);
+         super.write(buffer, flush, batch);
       }
    }
 }

Added: trunk/tests/src/org/hornetq/tests/integration/remoting/BatchDelayTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/BatchDelayTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/BatchDelayTest.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -0,0 +1,183 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.remoting;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.NettyConnectorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * 
+ * A BatchDelayTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class BatchDelayTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(BatchDelayTest.class);
+
+   private static final long DELAY = 500;
+   
+   // Attributes ----------------------------------------------------
+
+   private HornetQServer server;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      Map<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.BATCH_DELAY, DELAY);
+
+      TransportConfiguration tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+
+      Configuration config = new ConfigurationImpl();
+      config.getAcceptorConfigurations().add(tc);
+
+      config.setSecurityEnabled(false);
+      server = createServer(false, config);
+      server.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      server.stop();
+
+      server = null;
+
+      super.tearDown();
+   }
+
+   protected ClientSessionFactory createSessionFactory()
+   {
+      Map<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.BATCH_DELAY, DELAY);
+
+      ClientSessionFactory sf = HornetQClient.createClientSessionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName(),
+                                                                                                    params));
+
+      return sf;
+   }
+
+   public void testSendReceiveMany() throws Exception
+   {
+      ClientSessionFactory sf = createSessionFactory();
+
+      ClientSession session = sf.createSession();
+
+      final String foo = "foo";
+
+      session.createQueue(foo, foo);
+
+      ClientProducer prod = session.createProducer(foo);
+
+      ClientConsumer cons = session.createConsumer(foo);
+
+      session.start();
+
+      final int numMessages = 1000;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage msg = session.createMessage(false);
+
+         prod.send(msg);
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage msg = cons.receive(10000);
+
+         assertNotNull(msg);
+
+         msg.acknowledge();
+      }
+
+      sf.close();
+   }
+
+   public void testSendReceiveOne() throws Exception
+   {
+      ClientSessionFactory sf = createSessionFactory();
+
+      ClientSession session = sf.createSession();
+
+      final String foo = "foo";
+
+      session.createQueue(foo, foo);
+
+      ClientProducer prod = session.createProducer(foo);
+
+      ClientConsumer cons = session.createConsumer(foo);
+
+      session.start();
+
+      ClientMessage msg = session.createMessage(false);
+
+      long start = System.currentTimeMillis();
+      
+      prod.send(msg);
+
+      msg = cons.receive(10000);
+
+      assertNotNull(msg);
+      
+      long end = System.currentTimeMillis();
+      
+      //This will be delayed
+      
+      assertTrue(end - start > DELAY);
+
+      msg.acknowledge();
+
+      sf.close();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/SynchronousCloseTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/SynchronousCloseTest.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/SynchronousCloseTest.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -19,7 +19,6 @@
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;

Modified: trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -71,7 +71,6 @@
       Assert.assertEquals(false, conf.isCreateBindingsDir());
       Assert.assertEquals("somedir2", conf.getJournalDirectory());
       Assert.assertEquals(false, conf.isCreateJournalDir());
-
       Assert.assertEquals(JournalType.NIO, conf.getJournalType());
       Assert.assertEquals(10000, conf.getJournalBufferSize_NIO());
       Assert.assertEquals(1000, conf.getJournalBufferTimeout_NIO());

Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java	2010-03-29 11:47:29 UTC (rev 8997)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java	2010-03-29 13:35:27 UTC (rev 8998)
@@ -47,7 +47,7 @@
    public void testGetID() throws Exception
    {
       Channel channel = new SimpleChannel(RandomUtil.randomInt());
-      NettyConnection conn = new NettyConnection(channel, new MyListener(), -1);
+      NettyConnection conn = new NettyConnection(channel, new MyListener(), false);
 
       Assert.assertEquals(channel.getId().intValue(), conn.getID());
    }
@@ -59,7 +59,7 @@
 
       Assert.assertEquals(0, channel.getWritten().size());
 
-      NettyConnection conn = new NettyConnection(channel, new MyListener(), -1);
+      NettyConnection conn = new NettyConnection(channel, new MyListener(), false);
       conn.write(buff);
 
       Assert.assertEquals(1, channel.getWritten().size());
@@ -68,7 +68,7 @@
    public void testCreateBuffer() throws Exception
    {
       Channel channel = new SimpleChannel(RandomUtil.randomInt());
-      NettyConnection conn = new NettyConnection(channel, new MyListener(), -1);
+      NettyConnection conn = new NettyConnection(channel, new MyListener(), false);
 
       final int size = 1234;
 



More information about the hornetq-commits mailing list