[hornetq-commits] JBoss hornetq SVN: r9242 - in trunk: src/config/jboss-as-4/clustered and 24 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue May 18 08:00:18 EDT 2010


Author: timfox
Date: 2010-05-18 08:00:15 -0400 (Tue, 18 May 2010)
New Revision: 9242

Modified:
   trunk/docs/user-manual/en/configuring-transports.xml
   trunk/docs/user-manual/en/flow-control.xml
   trunk/docs/user-manual/en/ha.xml
   trunk/docs/user-manual/en/perf-tuning.xml
   trunk/docs/user-manual/en/thread-pooling.xml
   trunk/src/config/jboss-as-4/clustered/hornetq-configuration.xml
   trunk/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml
   trunk/src/config/jboss-as-5/clustered/hornetq-configuration.xml
   trunk/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml
   trunk/src/config/jboss-as-6/clustered/hornetq-configuration.xml
   trunk/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml
   trunk/src/config/stand-alone/clustered/hornetq-configuration.xml
   trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml
   trunk/src/config/trunk/clustered/hornetq-configuration.xml
   trunk/src/config/trunk/non-clustered/hornetq-configuration.xml
   trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/hornetq/core/postoffice/PostOffice.java
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
   trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
   trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
   trunk/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java
   trunk/src/main/org/hornetq/core/server/Queue.java
   trunk/src/main/org/hornetq/core/server/ServerSession.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
   trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
   trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
   trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
added direct delivery plus some docs tweaks

Modified: trunk/docs/user-manual/en/configuring-transports.xml
===================================================================
--- trunk/docs/user-manual/en/configuring-transports.xml	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/docs/user-manual/en/configuring-transports.xml	2010-05-18 12:00:15 UTC (rev 9242)
@@ -188,20 +188,23 @@
                 with firewall policies that typically only allow connections to be initiated in one
                 direction.</para>
             <para>All the valid Netty transport keys are defined in the class <literal
-                    >org.hornetq.core.remoting.impl.netty.TransportConstants</literal>. The
-                parameters can be used either with acceptors or connectors. The following parameters
-                can be used to configure Netty for simple TCP:</para>
+                    >org.hornetq.core.remoting.impl.netty.TransportConstants</literal>. Most
+                parameters can be used either with acceptors or connectors, some only work with
+                acceptors. The following parameters can be used to configure Netty for simple
+                TCP:</para>
             <itemizedlist>
                 <listitem>
                     <para><literal>use-nio</literal>. If this is <literal>true</literal> then Java
                         non blocking NIO will be used. If set to <literal>false</literal> than old
                         blocking Java IO will be used.</para>
-                    <para>We highly recommend that you use non blocking Java NIO. Java NIO does not
-                        maintain a thread per connection so can scale to many more concurrent
-                        connections than with old blocking IO. We recommend the usage of Java 6 for
-                        NIO and the best scalability. The default value for this property is
-                            <literal>true</literal> on the server side and <literal>false</literal>
-                        on the client side.</para>
+                    <para>If you require the server to handle many concurrent connections, we highly
+                        recommend that you use non blocking Java NIO. Java NIO does not maintain a
+                        thread per connection so can scale to many more concurrent connections than
+                        with old blocking IO. If you don't require the server to handle many
+                        concurrent connections, you might get slightly better performance by using
+                        old (blocking) IO. The default value for this property is <literal
+                            >false</literal> on the server side and <literal>false</literal> on the
+                        client side.</para>
                 </listitem>
                 <listitem>
                     <para><literal>host</literal>. This specifies the host name or IP address to
@@ -233,8 +236,8 @@
                         is <literal>true</literal>.</para>
                 </listitem>
                 <listitem>
-                    <para><literal>tcp-send-buffer-size</literal>. This parameter determines the size
-                        of the TCP send buffer in bytes. The default value for this property is
+                    <para><literal>tcp-send-buffer-size</literal>. This parameter determines the
+                        size of the TCP send buffer in bytes. The default value for this property is
                             <literal>32768</literal> bytes (32KiB).</para>
                     <para>TCP buffer sizes should be tuned according to the bandwidth and latency of
                         your network. Here's a good link that explains the theory behind <ulink
@@ -254,6 +257,37 @@
                         size of the TCP receive buffer in bytes. The default value for this property
                         is <literal>32768</literal> bytes (32KiB).</para>
                 </listitem>
+                <listitem>
+                    <para><literal>batch-delay</literal>. Before writing packets to the transport,
+                        HornetQ can be configured to batch up writes for a maximum of <literal
+                            >batch-delay</literal> milliseconds. This can increase overall
+                        throughput for very small messages. It does so at the expense of an increase
+                        in average latency for message transfer. The default value for this property
+                        is <literal>0</literal> ms.</para>
+                </listitem>
+                <listitem>
+                    <para><literal>direct-deliver</literal>. When a message arrives on the server
+                        and is delivered to waiting consumers, by default, the delivery is done on a
+                        different thread to that which the message arrived on. This gives the best
+                        overall throughput and scalability, especially on multi-core machines.
+                        However it also introduces some extra latency due to the extra context
+                        switch required. If you want the lowest latency and the possible expense of
+                        some reduction in throughput then you can make sure <literal
+                            >direct-deliver</literal> to true. The default value for this parameter
+                        is <literal>true</literal>. If you are willingh to take some small extra hit
+                        on latency but want the highest throughput set this parameter to <literal
+                            >false</literal>.</para>
+                </listitem>
+                <listitem>
+                    <para><literal>nio-remoting-threads</literal>. When configured to use NIO,
+                        HornetQ will, by default, use a number of threads equal to three times the
+                        number of cores (or hyper-threads) as reported by <literal
+                            >Runtime.getRuntime().availableProcessors()</literal> for processing
+                        incoming packets. If you want to override this value, you can set the number
+                        of threads by specifying this parameter. The default value for this
+                        parameter is <literal>-1</literal> which means use the value from <literal
+                            >Runtime.getRuntime().availableProcessors()</literal> * 3.</para>
+                </listitem>
             </itemizedlist>
         </section>
         <section>
@@ -269,20 +303,20 @@
                         SSL.</para>
                 </listitem>
                 <listitem>
-                    <para><literal>key-store-path</literal>. This is the path to the SSL key store on
-                        the client which holds the client certificates.</para>
+                    <para><literal>key-store-path</literal>. This is the path to the SSL key store
+                        on the client which holds the client certificates.</para>
                 </listitem>
                 <listitem>
                     <para><literal>key-store-password</literal>. This is the password for the client
                         certificate key store on the client.</para>
                 </listitem>
                 <listitem>
-                    <para><literal>trust-store-path</literal>. This is the path to the trusted client
-                        certificate store on the server.</para>
+                    <para><literal>trust-store-path</literal>. This is the path to the trusted
+                        client certificate store on the server.</para>
                 </listitem>
                 <listitem>
-                    <para><literal>trust-store-password</literal>. This is the password to the trusted
-                        client certificate store on the server.</para>
+                    <para><literal>trust-store-password</literal>. This is the password to the
+                        trusted client certificate store on the server.</para>
                 </listitem>
             </itemizedlist>
         </section>
@@ -303,8 +337,8 @@
                         before sending an empty http request to keep the connection alive</para>
                 </listitem>
                 <listitem>
-                    <para><literal>http-client-idle-scan-period</literal>. How often, in milliseconds,
-                        to scan for idle clients</para>
+                    <para><literal>http-client-idle-scan-period</literal>. How often, in
+                        milliseconds, to scan for idle clients</para>
                 </listitem>
                 <listitem>
                     <para><literal>http-response-time</literal>. How long the server can wait before

Modified: trunk/docs/user-manual/en/flow-control.xml
===================================================================
--- trunk/docs/user-manual/en/flow-control.xml	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/docs/user-manual/en/flow-control.xml	2010-05-18 12:00:15 UTC (rev 9242)
@@ -261,10 +261,12 @@
                100000 bytes and would block any producers sending to that address to prevent that
                max size being exceeded.</para>
             <para>Note the policy must be set to <literal>BLOCK</literal> to enable blocking producer
-               flow control.</para>
-            <para>Please note the default value for <literal>address-full-policy</literal> is to
-                  <literal>PAGE</literal> (see <xref linkend="paging" /> for more information on
-               paging).</para>
+            flow control.</para>
+            <note><para>Note that in the default configuration all addresses are set to block producers after 10 MiB of message data
+            is in the address. This means you cannot send more than 10MiB of message data to an address without it being consumed before the producers
+            will be blocked. If you do not want this behaviour increase the <literal>max-size-bytes</literal> parameter or change the 
+            address full message policy.</para>
+            </note>            
          </section>
       </section>
       <section>

Modified: trunk/docs/user-manual/en/ha.xml
===================================================================
--- trunk/docs/user-manual/en/ha.xml	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/docs/user-manual/en/ha.xml	2010-05-18 12:00:15 UTC (rev 9242)
@@ -135,7 +135,9 @@
             <section id="ha.mode.shared">
                 <title>Shared Store</title>
                 <para>When using a shared store, both live and backup servers share the
-                        <emphasis>same</emphasis> journal using a shared file system. </para>
+                        <emphasis>same</emphasis> entire data directory using a shared file system.
+                    This means the paging directory, journal directory, large messages and binding
+                    journal.</para>
                 <para>When failover occurs and the backup server takes over, it will load the
                     persistent storage from the shared file system and clients can connect to
                     it.</para>

Modified: trunk/docs/user-manual/en/perf-tuning.xml
===================================================================
--- trunk/docs/user-manual/en/perf-tuning.xml	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/docs/user-manual/en/perf-tuning.xml	2010-05-18 12:00:15 UTC (rev 9242)
@@ -99,6 +99,11 @@
                     really need durable messages then set them to be non-durable. Durable messages
                     incur a lot more overhead in persisting them to storage.</para>
             </listitem>
+            <listitem>
+                <para>Batch many sends or acknowledgements in a single transaction. HornetQ will
+                    only require a network round trip on the commit, not on every send or
+                    acknowledgement.</para>
+            </listitem>
         </itemizedlist>
     </section>
     <section>
@@ -154,17 +159,18 @@
                     for more information.</para>
             </listitem>
             <listitem>
-                <para>If you have very fast consumers, you can increase consumer-window-size. This effectively disables consumer flow control.</para>
+                <para>If you have very fast consumers, you can increase consumer-window-size. This
+                    effectively disables consumer flow control.</para>
             </listitem>
             <listitem>
-                <para>Socket NIO vs Socket Old IO. By default HornetQ uses Socket NIO on the server
-                    and old (blocking) IO on the client side (see the chapter on configuring
-                    transports for more information <xref linkend="configuring-transports"/>). NIO
-                    is much more scalable but can give you some latency hit compared to old blocking
-                    IO. If you expect to be able to service many thousands of connections on the
-                    server, then continue to use NIO on the server. However, if don't expect many
-                    thousands of connections on the server you can configure the server acceptors to
-                    use old IO, and might get a small performance advantage.</para>
+                <para>Socket NIO vs Socket Old IO. By default HornetQ uses old (blocking) on the
+                    server and the client side (see the chapter on configuring transports for more
+                    information <xref linkend="configuring-transports"/>). NIO is much more scalable
+                    but can give you some latency hit compared to old blocking IO. If you need to be
+                    able to service many thousands of connections on the server, then you should
+                    make sure you're using NIO on the server. However, if don't expect many
+                    thousands of connections on the server you can keep the server acceptors using
+                    old IO, and might get a small performance advantage.</para>
             </listitem>
             <listitem>
                 <para>Use the core API not JMS. Using the JMS API you will have slightly lower
@@ -180,7 +186,7 @@
     </section>
     <section>
         <title>Tuning Transport Settings</title>
-        <itemizedlist>            
+        <itemizedlist>
             <listitem>
                 <para>TCP buffer sizes. If you have a fast network and fast machines you may get a
                     performance boost by increasing the TCP send and receive buffer sizes. See the
@@ -202,13 +208,23 @@
                     This would allow up to 20000 file handles to be open by the user <literal
                         >serveruser</literal>. </para>
             </listitem>
+            <listitem>
+                <para>Use <literal>batch-delay</literal> and set <literal>direct-deliver</literal>
+                    to false for the best throughput for very small messages. HornetQ comes with a
+                    preconfigured connector/acceptor pair (<literal>netty-throughput</literal>) in
+                        <literal>hornetq-configuration.xml</literal> and JMS connection factory
+                        (<literal>ThroughputConnectionFactory</literal>) in <literal
+                        >hornetq-jms.xml</literal>which can be used to give the very best
+                    throughput, especially for small messages. See the <xref
+                        linkend="configuring-transports"/> for more information on this.</para>
+            </listitem>
         </itemizedlist>
     </section>
     <section>
         <title>Tuning the VM</title>
-        <para>We highly recommend you use the latest Java JVM for the best performance. We test internally using the
-            Sun JVM, so some of these tunings won't apply to JDKs from other providers (e.g. IBM or
-            JRockit)</para>
+        <para>We highly recommend you use the latest Java JVM for the best performance. We test
+            internally using the Sun JVM, so some of these tunings won't apply to JDKs from other
+            providers (e.g. IBM or JRockit)</para>
         <itemizedlist>
             <listitem>
                 <para>Garbage collection. For smooth server operation we recommend using a parallel
@@ -223,15 +239,6 @@
                     size and number of your messages. Use the JVM arguments <literal>-Xms</literal>
                     and <literal>-Xmx</literal> to set server available RAM. We recommend setting
                     them to the same high value.</para>
-                <para>HornetQ will regularly sample JVM memory and reports if the available memory
-                    is below a configurable threshold. Use this information to properly set JVM
-                    memory and paging. The sample is disabled by default. To enabled it, configure
-                    the sample frequency by setting <literal>memory-measure-interval</literal> in
-                        <literal>hornetq-configuration.xml</literal> (in milliseconds). When the
-                    available memory goes below the configured threshold, a warning is logged. The
-                    threshold can be also configured by setting <literal
-                        >memory-warning-threshold</literal> in <literal
-                        >hornetq-configuration.xml</literal> (default is 25%).</para>
             </listitem>
             <listitem>
                 <para>Aggressive options. Different JVMs provide different sets of JVM tuning
@@ -257,7 +264,10 @@
                 <note>
                     <para>Some popular libraries such as the Spring JMS Template are known to use
                         these anti-patterns. If you're using Spring JMS Template and you're getting
-                        poor performance you know why. Don't blame HornetQ!</para>
+                        poor performance you know why. Don't blame HornetQ! The Spring JMS Template
+                        can only safely be used in an app server which caches JMS sessions (e.g.
+                        using JCA), and only then for sending messages. It cannot be safely be used
+                        for synchronously consuming messages, even in an app server. </para>
                 </note>
             </listitem>
             <listitem>

Modified: trunk/docs/user-manual/en/thread-pooling.xml
===================================================================
--- trunk/docs/user-manual/en/thread-pooling.xml	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/docs/user-manual/en/thread-pooling.xml	2010-05-18 12:00:15 UTC (rev 9242)
@@ -28,6 +28,14 @@
             thread pool for scheduled use. A Java scheduled thread pool cannot be configured to use
             a standard thread pool, otherwise we could use a single thread pool for both scheduled
             and non scheduled activity.</para>
+        <para>When using old (blocking) IO, a separate thread pool is also used to service connections. Since old IO requires a thread per connection
+        it does not make sense to get them from the standard pool as the pool will easily get exhausted if too many connections are made, resulting in the
+        server "hanging" since it has no remaining threads to do anything else.</para>
+        <para>When using new IO (NIO), HornetQ will, by default, use a number of threads equal to three times the number of cores (or hyper-threads)
+            as reported by Runtime.getRuntime().availableProcessors() for processing incoming packets.
+            If you want to override this value, you can set the number of threads by specifying the parameter <literal>nio-remoting-threads</literal>
+            in the transport configuration. See the
+            <xref linkend="configuring-transports"/> for more information on this.</para>
         <para>There are also a small number of other places where threads are used directly, we'll
             discuss each in turn.</para>
         <section id="server.scheduled.thread.pool">
@@ -60,7 +68,7 @@
                 bounded thread pool is used with caution since it can lead to dead-lock situations
                 if the upper bound is chosen to be too low.</para>
             <para>The default value for <literal>thread-pool-max-size</literal> is <literal
-                    >-1</literal>, i.e. the thread pool is unbounded.</para>
+                    >30</literal>.</para>
             <para>See the <ulink
                     url="http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.html"
                     >J2SE javadoc</ulink> for more information on unbounded (cached), and bounded

Modified: trunk/src/config/jboss-as-4/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-4/clustered/hornetq-configuration.xml	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/jboss-as-4/clustered/hornetq-configuration.xml	2010-05-18 12:00:15 UTC (rev 9242)
@@ -40,7 +40,7 @@
          <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
          <param key="host"  value="${jboss.bind.address:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
-         <param key="batch-delay" value="50"/>
+         <param key="batch-delay" value="50"/>         
       </connector>
 
       <connector name="in-vm">
@@ -62,6 +62,7 @@
          <param key="host"  value="${jboss.bind.address:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
          <param key="batch-delay" value="50"/>
+         <param key="direct-deliver" value="false"/>
       </acceptor>
 
       <acceptor name="in-vm">

Modified: trunk/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml	2010-05-18 12:00:15 UTC (rev 9242)
@@ -60,6 +60,7 @@
          <param key="host"  value="${jboss.bind.address:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
          <param key="batch-delay" value="50"/>
+         <param key="direct-deliver" value="false"/>
       </acceptor>
 
       <acceptor name="in-vm">

Modified: trunk/src/config/jboss-as-5/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-5/clustered/hornetq-configuration.xml	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/jboss-as-5/clustered/hornetq-configuration.xml	2010-05-18 12:00:15 UTC (rev 9242)
@@ -62,6 +62,7 @@
          <param key="host"  value="${jboss.bind.address:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
          <param key="batch-delay" value="50"/>
+         <param key="direct-deliver" value="false"/>
       </acceptor>
 
       <acceptor name="in-vm">

Modified: trunk/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml	2010-05-18 12:00:15 UTC (rev 9242)
@@ -60,6 +60,7 @@
          <param key="host"  value="${jboss.bind.address:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
          <param key="batch-delay" value="50"/>
+         <param key="direct-deliver" value="false"/>
       </acceptor>
 
       <acceptor name="in-vm">

Modified: trunk/src/config/jboss-as-6/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-6/clustered/hornetq-configuration.xml	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/jboss-as-6/clustered/hornetq-configuration.xml	2010-05-18 12:00:15 UTC (rev 9242)
@@ -62,6 +62,7 @@
          <param key="host"  value="${jboss.bind.address:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
          <param key="batch-delay" value="50"/>
+         <param key="direct-deliver" value="false"/>
       </acceptor>
 
       <acceptor name="in-vm">

Modified: trunk/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml	2010-05-18 12:00:15 UTC (rev 9242)
@@ -60,6 +60,7 @@
          <param key="host"  value="${jboss.bind.address:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
          <param key="batch-delay" value="50"/>
+         <param key="direct-deliver" value="false"/>
       </acceptor>
 
       <acceptor name="in-vm">

Modified: trunk/src/config/stand-alone/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/clustered/hornetq-configuration.xml	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/stand-alone/clustered/hornetq-configuration.xml	2010-05-18 12:00:15 UTC (rev 9242)
@@ -41,6 +41,7 @@
          <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
          <param key="batch-delay" value="50"/>
+         <param key="direct-deliver" value="false"/>
       </acceptor>
    </acceptors>
 

Modified: trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml	2010-05-18 12:00:15 UTC (rev 9242)
@@ -39,6 +39,7 @@
          <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
          <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
          <param key="batch-delay" value="50"/>
+         <param key="direct-deliver" value="false"/>
       </acceptor>
    </acceptors>
 

Modified: trunk/src/config/trunk/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/trunk/clustered/hornetq-configuration.xml	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/trunk/clustered/hornetq-configuration.xml	2010-05-18 12:00:15 UTC (rev 9242)
@@ -33,6 +33,7 @@
             <param key="host"  value="${jboss.bind.address:localhost}"/>
             <param key="port"  value="${hornetq.remoting.netty.port:5455}"/>
             <param key="batch-delay" value="50"/>
+            <param key="direct-deliver" value="false"/>
       </acceptor>
 	</acceptors>
 	

Modified: trunk/src/config/trunk/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/trunk/non-clustered/hornetq-configuration.xml	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/trunk/non-clustered/hornetq-configuration.xml	2010-05-18 12:00:15 UTC (rev 9242)
@@ -31,6 +31,7 @@
             <param key="host"  value="${jboss.bind.address:localhost}"/>
             <param key="port"  value="${hornetq.remoting.netty.port:5455}"/>
             <param key="batch-delay" value="50"/>
+            <param key="direct-deliver" value="false"/>
         </acceptor>
 	</acceptors>
 	

Modified: trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -838,6 +838,7 @@
       for (int i = 0; i < paramsNodes.getLength(); i++)
       {
          Node paramNode = paramsNodes.item(i);
+         
          NamedNodeMap attributes = paramNode.getAttributes();
 
          Node nkey = attributes.getNamedItem("key");

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -985,7 +985,7 @@
 
          }
 
-         postOffice.route(message, depageTransaction);
+         postOffice.route(message, depageTransaction, false);
 
          // This means the page is duplicated. So we need to ignore this
          if (depageTransaction.getState() == State.ROLLBACK_ONLY)

Modified: trunk/src/main/org/hornetq/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/PostOffice.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/postoffice/PostOffice.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -49,11 +49,11 @@
 
    Bindings getMatchingBindings(SimpleString address);
    
-   void route(ServerMessage message) throws Exception;
+   void route(ServerMessage message, boolean direct) throws Exception;
 
-   void route(ServerMessage message, Transaction tx) throws Exception;
+   void route(ServerMessage message, Transaction tx, boolean direct) throws Exception;
 
-   void route(ServerMessage message, RoutingContext context) throws Exception;
+   void route(ServerMessage message, RoutingContext context, boolean direct) throws Exception;
 
    MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception;
 

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -64,7 +64,6 @@
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
 import org.hornetq.core.transaction.Transaction.State;
 import org.hornetq.core.transaction.impl.TransactionImpl;
-import org.hornetq.utils.ConcurrentHashSet;
 import org.hornetq.utils.TypedProperties;
 import org.hornetq.utils.UUIDGenerator;
 
@@ -529,17 +528,17 @@
       return addressManager.getMatchingBindings(address);
    }
 
-   public void route(final ServerMessage message) throws Exception
+   public void route(final ServerMessage message, final boolean direct) throws Exception
    {
-      route(message, (Transaction)null);
+      route(message, (Transaction)null, direct);
    }
 
-   public void route(final ServerMessage message, final Transaction tx) throws Exception
+   public void route(final ServerMessage message, final Transaction tx, final boolean direct) throws Exception
    {
-      this.route(message, new RoutingContextImpl(tx));
+      route(message, new RoutingContextImpl(tx), direct);
    }
    
-   public void route(final ServerMessage message, final RoutingContext context) throws Exception
+   public void route(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception
    {
       // Sanity check
       if (message.getRefCount() > 0)
@@ -656,13 +655,13 @@
 
                message.setAddress(dlaAddress);
 
-               route(message, context.getTransaction());
+               route(message, context.getTransaction(), false);
             }
          }
       }
       else
       {
-         processRoute(message, context);
+         processRoute(message, context, direct);
       }
 
       if (startedTx)
@@ -689,7 +688,7 @@
 
       if (tx == null)
       {
-         queue.addLast(reference);
+         queue.addLast(reference, false);
       }
       else
       {
@@ -717,7 +716,7 @@
 
          if (routed)
          {
-            processRoute(message, context);
+            processRoute(message, context, false);
 
             res = true;
          }
@@ -778,7 +777,7 @@
 
          message.setAddress(queueName);
          message.putBooleanProperty(PostOfficeImpl.HDR_RESET_QUEUE_DATA, true);
-         routeDirect(message, queue, false);
+         routeQueueInfo(message, queue, false);
 
          for (QueueInfo info : queueInfos.values())
          {
@@ -793,7 +792,7 @@
                message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, info.getFilterString());
                message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
 
-               routeDirect(message, queue, true);
+               routeQueueInfo(message, queue, true);
 
                int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
 
@@ -806,7 +805,7 @@
                   message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
                   message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
 
-                  routeDirect(message, queue, true);
+                  routeQueueInfo(message, queue, true);
                }
 
                if (info.getFilterStrings() != null)
@@ -821,7 +820,7 @@
                      message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
                      message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
 
-                     routeDirect(message, queue, true);
+                     routeQueueInfo(message, queue, true);
                   }
                }
             }
@@ -838,7 +837,7 @@
       message.setPagingStore(store);
    }
 
-   private void routeDirect(final ServerMessage message, final Queue queue, final boolean applyFilters) throws Exception
+   private void routeQueueInfo(final ServerMessage message, final Queue queue, final boolean applyFilters) throws Exception
    {
       if (!applyFilters || queue.getFilter() == null || queue.getFilter().match(message))
       {
@@ -846,11 +845,11 @@
 
          queue.route(message, context);
 
-         processRoute(message, context);
+         processRoute(message, context, false);
       }
    }
 
-   private void processRoute(final ServerMessage message, final RoutingContext context) throws Exception
+   private void processRoute(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception
    {
       final List<MessageReference> refs = new ArrayList<MessageReference>();
 
@@ -951,7 +950,7 @@
 
             public void done()
             {
-               addReferences(refs);
+               addReferences(refs, direct);
             }
          });
       }
@@ -960,11 +959,11 @@
    /**
     * @param refs
     */
-   private void addReferences(final List<MessageReference> refs)
+   private void addReferences(final List<MessageReference> refs, final boolean direct)
    {
       for (MessageReference ref : refs)
       {
-         ref.getQueue().addLast(ref);
+         ref.getQueue().addLast(ref, direct);
       }
    }
 
@@ -985,7 +984,6 @@
       ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
 
       message.setAddress(queueName);
-      // message.setDurable(true);
 
       String uid = UUIDGenerator.getInstance().generateStringUUID();
 
@@ -1199,7 +1197,7 @@
                   // This could happen when the PageStore left the pageState
 
                   // TODO is this correct - don't we lose transactionality here???
-                  route(message);
+                  route(message, false);
                }
                first = false;
             }
@@ -1235,7 +1233,7 @@
       {
          for (MessageReference ref : refs)
          {
-            ref.getQueue().addLast(ref);
+            ref.getQueue().addLast(ref, false);
          }
       }
 

Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -92,10 +92,12 @@
 import org.hornetq.core.protocol.core.impl.wireformat.SessionXAStartMessage;
 import org.hornetq.core.remoting.CloseListener;
 import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.remoting.impl.netty.NettyConnection;
 import org.hornetq.core.server.BindingQueryResult;
 import org.hornetq.core.server.QueueQueryResult;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.ServerSession;
+import org.hornetq.spi.core.remoting.Connection;
 
 /**
  * A ServerSessionPacketHandler
@@ -119,6 +121,8 @@
    private final Channel channel;
 
    private volatile CoreRemotingConnection remotingConnection;
+   
+   private final boolean direct;
 
    public ServerSessionPacketHandler(final ServerSession session,
                                      final OperationContext sessionContext,
@@ -134,7 +138,19 @@
       this.channel = channel;
 
       this.remotingConnection = channel.getConnection();
-
+      
+      //TODO think of a better way of doing this
+      Connection conn = remotingConnection.getTransportConnection();
+      
+      if (conn instanceof NettyConnection)
+      {
+         direct = ((NettyConnection)conn).isDirectDeliver();
+      }
+      else
+      {
+         direct = false;
+      }
+      
       addConnectionListeners();
    }
 
@@ -442,7 +458,7 @@
                {
                   SessionSendMessage message = (SessionSendMessage)packet;
                   requiresResponse = message.isRequiresResponse();
-                  session.send((ServerMessage)message.getMessage());
+                  session.send((ServerMessage)message.getMessage(), direct);
                   if (requiresResponse)
                   {
                      response = new NullResponseMessage();

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -535,7 +535,7 @@
       {
          message.putStringProperty(CONNECTION_ID_PROP, connection.getID().toString());
       }
-      stompSession.getSession().send(message);
+      stompSession.getSession().send(message, true);
       return null;
    }
 

Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -140,7 +140,7 @@
    private final ConcurrentMap<Object, Connection> connections = new ConcurrentHashMap<Object, Connection>();
 
    private final Executor threadPool;
-   
+
    private final ScheduledExecutorService scheduledThreadPool;
 
    private NotificationService notificationService;
@@ -148,13 +148,15 @@
    private VirtualExecutorService bossExecutor;
 
    private boolean paused;
-   
+
    private BatchFlusher flusher;
-   
+
    private ScheduledFuture<?> batchFlusherFuture;
-   
+
    private final long batchDelay;
 
+   private final boolean directDeliver;
+
    public NettyAcceptor(final Map<String, Object> configuration,
                         final BufferHandler handler,
                         final BufferDecoder decoder,
@@ -253,12 +255,16 @@
                                                                 configuration);
 
       this.threadPool = threadPool;
-      
+
       this.scheduledThreadPool = scheduledThreadPool;
-      
+
       batchDelay = ConfigurationHelper.getLongProperty(TransportConstants.BATCH_DELAY,
                                                        TransportConstants.DEFAULT_BATCH_DELAY,
                                                        configuration);
+
+      directDeliver = ConfigurationHelper.getBooleanProperty(TransportConstants.DIRECT_DELIVER,
+                                                             TransportConstants.DEFAULT_DIRECT_DELIVER,
+                                                             configuration);
    }
 
    public synchronized void start() throws Exception
@@ -418,15 +424,25 @@
          Notification notification = new Notification(null, NotificationType.ACCEPTOR_STARTED, props);
          notificationService.sendNotification(notification);
       }
-      
+
       if (batchDelay > 0)
       {
          flusher = new BatchFlusher();
-         
-         batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher, batchDelay, batchDelay, TimeUnit.MILLISECONDS);
+
+         batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher,
+                                                                         batchDelay,
+                                                                         batchDelay,
+                                                                         TimeUnit.MILLISECONDS);
       }
 
-      NettyAcceptor.log.info("Started Netty Acceptor version " + Version.ID + " " + host + ":" + port + " for " + protocol + " protocol");
+      NettyAcceptor.log.info("Started Netty Acceptor version " + Version.ID +
+                             " " +
+                             host +
+                             ":" +
+                             port +
+                             " for " +
+                             protocol +
+                             " protocol");
    }
 
    private void startServerChannels()
@@ -454,15 +470,15 @@
       {
          return;
       }
-      
+
       if (batchFlusherFuture != null)
       {
          batchFlusherFuture.cancel(false);
-         
+
          flusher.cancel();
-         
+
          flusher = null;
-         
+
          batchFlusherFuture = null;
       }
 
@@ -589,7 +605,7 @@
       @Override
       public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
       {
-         new NettyConnection(e.getChannel(), new Listener(), !httpEnabled && batchDelay > 0);
+         new NettyConnection(e.getChannel(), new Listener(), !httpEnabled && batchDelay > 0, directDeliver);
 
          SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
          if (sslHandler != null)
@@ -650,7 +666,7 @@
 
       }
    }
-   
+
    private class BatchFlusher implements Runnable
    {
       private boolean cancelled;

Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -50,6 +50,8 @@
 
    private final boolean batchingEnabled;
    
+   private final boolean directDeliver;
+   
    private HornetQBuffer batchBuffer;
    
    private final Object writeLock = new Object();
@@ -58,7 +60,7 @@
 
    // Constructors --------------------------------------------------
 
-   public NettyConnection(final Channel channel, final ConnectionLifeCycleListener listener, boolean batchingEnabled)
+   public NettyConnection(final Channel channel, final ConnectionLifeCycleListener listener, boolean batchingEnabled, boolean directDeliver)
    {
       this.channel = channel;
 
@@ -66,6 +68,8 @@
 
       this.batchingEnabled = batchingEnabled;
       
+      this.directDeliver = directDeliver;
+      
       listener.connectionCreated(this, ProtocolType.CORE);
    }
 
@@ -211,6 +215,11 @@
    {
       return channel.getRemoteAddress().toString();
    }
+   
+   public boolean isDirectDeliver()
+   {
+      return directDeliver;
+   }
 
    // Public --------------------------------------------------------
 

Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -470,7 +470,7 @@
             ch.getPipeline().get(HornetQChannelHandler.class).active = true;
          }
 
-         NettyConnection conn = new NettyConnection(ch, new Listener(), !httpEnabled && batchDelay > 0);
+         NettyConnection conn = new NettyConnection(ch, new Listener(), !httpEnabled && batchDelay > 0, false);
 
          return conn;
       }

Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -72,6 +72,8 @@
    
    public static final String BATCH_DELAY = "batch-delay";
    
+   public static final String DIRECT_DELIVER = "direct-deliver";
+   
    public static final boolean DEFAULT_SSL_ENABLED = false;
 
    public static final boolean DEFAULT_USE_NIO_SERVER = false;
@@ -120,6 +122,8 @@
    public static final String DEFAULT_SERVLET_PATH = "/messaging/HornetQServlet";
    
    public static final long DEFAULT_BATCH_DELAY = 0;
+   
+   public static final boolean DEFAULT_DIRECT_DELIVER = true;
 
    public static final Set<String> ALLOWABLE_CONNECTOR_KEYS;
 
@@ -146,6 +150,7 @@
       allowableAcceptorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME);
       allowableAcceptorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
       allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY);
+      allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER);
 
       ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);
 

Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/Queue.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -48,9 +48,11 @@
    void removeConsumer(Consumer consumer) throws Exception;
 
    int getConsumerCount();
-
+   
    void addLast(MessageReference ref);
 
+   void addLast(MessageReference ref, boolean direct);
+
    void addFirst(MessageReference ref);
 
    void acknowledge(MessageReference ref) throws Exception;

Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -99,7 +99,7 @@
 
    void sendContinuations(int packetSize, byte[] body, boolean continues) throws Exception;
 
-   void send(ServerMessage message) throws Exception;
+   void send(ServerMessage message, boolean direct) throws Exception;
 
    void sendLarge(byte[] largeMessageHeader) throws Exception;
 

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -17,6 +17,7 @@
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -25,7 +26,13 @@
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.SendAcknowledgementHandler;
+import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.api.core.management.ResourceNames;
@@ -81,7 +88,7 @@
 
    private final SimpleString forwardingAddress;
 
-   private final java.util.Queue<MessageReference> refs = new LinkedList<MessageReference>();
+   private final java.util.Queue<MessageReference> refs = new ConcurrentLinkedQueue<MessageReference>();
 
    private final Transformer transformer;
 

Modified: trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -86,6 +86,7 @@
       // TODO we can optimise this so it doesn't copy if it's not routed anywhere else
 
       long id = storageManager.generateUniqueID();
+      
       ServerMessage copy = message.copy(id);
 
       // This will set the original MessageId, and the original address
@@ -98,7 +99,7 @@
          copy = transformer.transform(copy);
       }
 
-      postOffice.route(copy, context.getTransaction());
+      postOffice.route(copy, context.getTransaction(), false);
    }
 
    public SimpleString getRoutingName()

Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -71,7 +71,7 @@
    }
 
    @Override
-   public synchronized void add(final MessageReference ref, final boolean first)
+   public synchronized void add(final MessageReference ref, final boolean first, final boolean direct)
    {
       SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
 
@@ -107,7 +107,7 @@
 
                map.put(prop, hr);
 
-               super.add(hr, first);
+               super.add(hr, first, direct);
             }
          }
          else
@@ -133,13 +133,13 @@
             {
                map.put(prop, (HolderReference)ref);
 
-               super.add(ref, first);
+               super.add(ref, first, direct);
             }
          }
       }
       else
       {
-         super.add(ref, first);
+         super.add(ref, first, direct);
       }
    }
 

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -82,7 +82,8 @@
 
    private final PostOffice postOffice;
 
-   private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(true, QueueImpl.NUM_PRIORITIES);
+   private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(true,
+                                                                                                                       QueueImpl.NUM_PRIORITIES);
 
    private final List<ConsumerHolder> consumerList = new ArrayList<ConsumerHolder>();
 
@@ -228,17 +229,22 @@
    {
       return filter;
    }
-
+   
    public void addLast(final MessageReference ref)
    {
+      addLast(ref, false);
+   }
+
+   public void addLast(final MessageReference ref, final boolean direct)
+   {
       messagesAdded.incrementAndGet();
-    
-      add(ref, false);
+
+      add(ref, false, direct);
    }
 
    public void addFirst(final MessageReference ref)
    {
-      add(ref, true);
+      add(ref, true, false);
    }
 
    public void deliverAsync()
@@ -268,19 +274,19 @@
    public synchronized void removeConsumer(final Consumer consumer) throws Exception
    {
       Iterator<ConsumerHolder> iter = consumerList.iterator();
-      
+
       while (iter.hasNext())
       {
          ConsumerHolder holder = iter.next();
-         
+
          if (holder.consumer == consumer)
          {
             iter.remove();
-            
+
             break;
          }
       }
-      
+
       if (pos > 0 && pos >= consumerList.size())
       {
          pos = consumerList.size() - 1;
@@ -297,7 +303,7 @@
             gids.add(groupID);
          }
       }
-      
+
       for (SimpleString gid : gids)
       {
          groups.remove(gid);
@@ -345,19 +351,19 @@
          redistributor = null;
 
          Iterator<ConsumerHolder> iter = consumerList.iterator();
-         
+
          while (iter.hasNext())
          {
             ConsumerHolder holder = iter.next();
-            
+
             if (holder.consumer == redistributor)
             {
                iter.remove();
-               
+
                break;
             }
          }
-         
+
          if (pos > 0 && pos >= consumerList.size())
          {
             pos = consumerList.size() - 1;
@@ -854,7 +860,7 @@
          {
             iter.remove();
             ref.getMessage().setPriority(newPriority);
-            addLast(ref);
+            addLast(ref, false);
             return true;
          }
       }
@@ -875,7 +881,7 @@
             count++;
             iter.remove();
             ref.getMessage().setPriority(newPriority);
-            addLast(ref);
+            addLast(ref, false);
          }
       }
       return count;
@@ -981,7 +987,7 @@
 
       copyMessage.setAddress(toAddress);
 
-      postOffice.route(copyMessage, tx);
+      postOffice.route(copyMessage, tx, false);
 
       acknowledge(tx, ref);
    }
@@ -1070,7 +1076,7 @@
 
       copyMessage.setAddress(address);
 
-      postOffice.route(copyMessage, tx);
+      postOffice.route(copyMessage, tx, false);
 
       acknowledge(tx, ref);
 
@@ -1083,15 +1089,7 @@
       {
          return;
       }
-      
-      // Disadvantage of this algorithm is that if there are many consumers which are busy a lot of the
-      // time, then they get tried with a message each time, and the message put back on the queue, which
-      // is inefficient
 
-      // This represents the number of consumers that are unavailable to take a message due either to
-      // there not being any messages available for its iterator/in queue or it's busy
-      // int unavailableCount = 0;
-
       int busyCount = 0;
 
       int nullRefCount = 0;
@@ -1099,10 +1097,10 @@
       int size = consumerList.size();
 
       int startPos = pos;
-      
+
       // Deliver at most 1000 messages in one go, to prevent tying this thread up for too long
       int loop = Math.min(messageReferences.size(), 1000);
-      
+
       for (int i = 0; i < loop; i++)
       {
          ConsumerHolder holder = consumerList.get(pos);
@@ -1119,7 +1117,7 @@
          {
             ref = holder.iter.next();
          }
-         
+
          if (ref == null)
          {
             nullRefCount++;
@@ -1137,21 +1135,21 @@
             }
 
             Consumer groupConsumer = null;
-            
-            //If a group id is set, then this overrides the consumer chosen round-robin
-            
+
+            // If a group id is set, then this overrides the consumer chosen round-robin
+
             SimpleString groupID = ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
 
             if (groupID != null)
             {
                groupConsumer = groups.get(groupID);
-               
+
                if (groupConsumer != null)
                {
                   consumer = groupConsumer;
                }
             }
-            
+
             HandleStatus status = handle(ref, consumer);
 
             if (status == HandleStatus.HANDLED)
@@ -1160,7 +1158,7 @@
                {
                   holder.iter.remove();
                }
-               
+
                if (groupID != null && groupConsumer == null)
                {
                   groups.put(groupID, consumer);
@@ -1186,14 +1184,14 @@
                   messageReferences.addFirst(ref, ref.getMessage().getPriority());
 
                   holder.iter = messageReferences.iterator();
-                  
-                  //Skip past the one we just put back
-                  
+
+                  // Skip past the one we just put back
+
                   holder.iter.next();
                }
             }
          }
-         
+
          pos++;
 
          if (pos == size)
@@ -1224,6 +1222,75 @@
 
    }
 
+   /*
+    * This method delivers the reference on the callers thread - this can give us better latency in the case there is nothing in the queue
+    */
+   private synchronized boolean deliverDirect(final MessageReference ref)
+   {
+      if (paused || consumerList.isEmpty())
+      {
+         return false;
+      }
+      
+      if (checkExpired(ref))
+      {
+         return true;
+      }
+      
+      int startPos = pos;
+      
+      int size = consumerList.size();
+
+      while (true)
+      {
+         ConsumerHolder holder = consumerList.get(pos);
+
+         Consumer consumer = holder.consumer;
+
+         Consumer groupConsumer = null;
+
+         // If a group id is set, then this overrides the consumer chosen round-robin
+
+         SimpleString groupID = ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+
+         if (groupID != null)
+         {
+            groupConsumer = groups.get(groupID);
+
+            if (groupConsumer != null)
+            {
+               consumer = groupConsumer;
+            }
+         }
+         
+         pos++;
+
+         if (pos == size)
+         {
+            pos = 0;
+         }
+
+         HandleStatus status = handle(ref, consumer);
+
+         if (status == HandleStatus.HANDLED)
+         {            
+            if (groupID != null && groupConsumer == null)
+            {
+               groups.put(groupID, consumer);
+            }
+            
+            return true;
+         }
+
+         if (pos == startPos)
+         {
+            // Tried them all
+
+            return false;
+         }
+      }
+   }
+
    private boolean checkExpired(final MessageReference reference)
    {
       if (reference.getMessage().isExpired())
@@ -1247,15 +1314,23 @@
       }
    }
 
-   protected void add(final MessageReference ref, final boolean first)
+   protected void add(final MessageReference ref, final boolean first, final boolean direct)
    {
       if (scheduledDeliveryHandler.checkAndSchedule(ref))
       {
          return;
       }
 
+      if (direct && messageReferences.isEmpty())
+      {
+         if (deliverDirect(ref))
+         {
+            return;
+         }
+      }
+
       int refs;
-      
+
       if (first)
       {
          refs = messageReferences.addFirst(ref, ref.getMessage().getPriority());
@@ -1358,7 +1433,7 @@
       {
          for (MessageReference ref : refs)
          {
-            add(ref, true);
+            add(ref, true, false);
          }
 
          deliverAsync();

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -360,25 +360,7 @@
          // dies. It does not mean it will get deleted automatically when the
          // session is closed.
          // It is up to the user to delete the queue when finished with it
-
-         CloseListener closeListener = new CloseListener()
-         {
-            public void connectionClosed()
-            {
-               try
-               {
-                  if (postOffice.getBinding(name) != null)
-                  {
-                     postOffice.removeBinding(name);
-                  }
-               }
-               catch (Exception e)
-               {
-                  ServerSessionImpl.log.error("Failed to remove temporary queue " + name);
-               }
-            }
-         };
-
+        
          TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(postOffice, name);
 
          remotingConnection.addCloseListener(cleaner);
@@ -988,7 +970,7 @@
       currentLargeMessage = msg;
    }
 
-   public void send(final ServerMessage message) throws Exception
+   public void send(final ServerMessage message, final boolean direct) throws Exception
    {
       long id = storageManager.generateUniqueID();
 
@@ -1016,11 +998,11 @@
       {
          // It's a management message
 
-         handleManagementMessage(message);
+         handleManagementMessage(message, direct);
       }
       else
       {
-         doSend(message);
+         doSend(message, direct);
       }
 
       if (defaultAddress == null)
@@ -1045,7 +1027,7 @@
       {
          currentLargeMessage.releaseResources();
 
-         doSend(currentLargeMessage);
+         doSend(currentLargeMessage, false);
 
          currentLargeMessage = null;
       }
@@ -1112,7 +1094,7 @@
       started = s;
    }
 
-   private void handleManagementMessage(final ServerMessage message) throws Exception
+   private void handleManagementMessage(final ServerMessage message, final boolean direct) throws Exception
    {
       try
       {
@@ -1135,7 +1117,7 @@
       {
          reply.setAddress(replyTo);
 
-         doSend(reply);
+         doSend(reply, direct);
       }
    }
 
@@ -1171,7 +1153,7 @@
       }
    }
 
-   private void doSend(final ServerMessage msg) throws Exception
+   private void doSend(final ServerMessage msg, final boolean direct) throws Exception
    {
       // check the user has write access to this address.
       try
@@ -1200,7 +1182,7 @@
          routingContext.setTransaction(tx);
       }
 
-      postOffice.route(msg, routingContext);
+      postOffice.route(msg, routingContext, direct);
 
       routingContext.clear();
    }

Modified: trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -720,7 +720,7 @@
                                                      new SimpleString(notification.getUID()));
                }
 
-               postOffice.route(notificationMessage);
+               postOffice.route(notificationMessage, false);
             }
          }
       }

Modified: trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -150,7 +150,7 @@
 
             MessageReference ref = message.createReference(queue);
 
-            queue.addLast(ref);
+            queue.addLast(ref, false);
 
             refs.add(ref);
 

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -524,4 +524,10 @@
       return null;
    }
 
+   public void addLast(MessageReference ref, boolean direct)
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
 }
\ No newline at end of file

Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -47,7 +47,7 @@
    public void testGetID() throws Exception
    {
       Channel channel = new SimpleChannel(RandomUtil.randomInt());
-      NettyConnection conn = new NettyConnection(channel, new MyListener(), false);
+      NettyConnection conn = new NettyConnection(channel, new MyListener(), false, false);
 
       Assert.assertEquals(channel.getId().intValue(), conn.getID());
    }
@@ -59,7 +59,7 @@
 
       Assert.assertEquals(0, channel.getWritten().size());
 
-      NettyConnection conn = new NettyConnection(channel, new MyListener(), false);
+      NettyConnection conn = new NettyConnection(channel, new MyListener(), false, false);
       conn.write(buff);
 
       Assert.assertEquals(1, channel.getWritten().size());
@@ -68,7 +68,7 @@
    public void testCreateBuffer() throws Exception
    {
       Channel channel = new SimpleChannel(RandomUtil.randomInt());
-      NettyConnection conn = new NettyConnection(channel, new MyListener(), false);
+      NettyConnection conn = new NettyConnection(channel, new MyListener(), false, false);
 
       final int size = 1234;
 

Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java	2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java	2010-05-18 12:00:15 UTC (rev 9242)
@@ -178,4 +178,22 @@
 
    }
 
+   public void route(ServerMessage message, boolean direct) throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   public void route(ServerMessage message, RoutingContext context, boolean direct) throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   public void route(ServerMessage message, Transaction tx, boolean direct) throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
 }
\ No newline at end of file



More information about the hornetq-commits mailing list