Author: timfox
Date: 2010-05-18 08:00:15 -0400 (Tue, 18 May 2010)
New Revision: 9242
Modified:
trunk/docs/user-manual/en/configuring-transports.xml
trunk/docs/user-manual/en/flow-control.xml
trunk/docs/user-manual/en/ha.xml
trunk/docs/user-manual/en/perf-tuning.xml
trunk/docs/user-manual/en/thread-pooling.xml
trunk/src/config/jboss-as-4/clustered/hornetq-configuration.xml
trunk/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml
trunk/src/config/jboss-as-5/clustered/hornetq-configuration.xml
trunk/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml
trunk/src/config/jboss-as-6/clustered/hornetq-configuration.xml
trunk/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml
trunk/src/config/stand-alone/clustered/hornetq-configuration.xml
trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml
trunk/src/config/trunk/clustered/hornetq-configuration.xml
trunk/src/config/trunk/non-clustered/hornetq-configuration.xml
trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/postoffice/PostOffice.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
added direct delivery plus some docs tweaks
Modified: trunk/docs/user-manual/en/configuring-transports.xml
===================================================================
--- trunk/docs/user-manual/en/configuring-transports.xml 2010-05-17 17:39:52 UTC (rev
9241)
+++ trunk/docs/user-manual/en/configuring-transports.xml 2010-05-18 12:00:15 UTC (rev
9242)
@@ -188,20 +188,23 @@
with firewall policies that typically only allow connections to be
initiated in one
direction.</para>
<para>All the valid Netty transport keys are defined in the class
<literal
-
>org.hornetq.core.remoting.impl.netty.TransportConstants</literal>. The
- parameters can be used either with acceptors or connectors. The following
parameters
- can be used to configure Netty for simple TCP:</para>
+
>org.hornetq.core.remoting.impl.netty.TransportConstants</literal>. Most
+ parameters can be used either with acceptors or connectors, some only
work with
+ acceptors. The following parameters can be used to configure Netty for
simple
+ TCP:</para>
<itemizedlist>
<listitem>
<para><literal>use-nio</literal>. If this is
<literal>true</literal> then Java
non blocking NIO will be used. If set to
<literal>false</literal> than old
blocking Java IO will be used.</para>
- <para>We highly recommend that you use non blocking Java NIO.
Java NIO does not
- maintain a thread per connection so can scale to many more
concurrent
- connections than with old blocking IO. We recommend the usage of
Java 6 for
- NIO and the best scalability. The default value for this property
is
- <literal>true</literal> on the server side and
<literal>false</literal>
- on the client side.</para>
+ <para>If you require the server to handle many concurrent
connections, we highly
+ recommend that you use non blocking Java NIO. Java NIO does not
maintain a
+ thread per connection so can scale to many more concurrent
connections than
+ with old blocking IO. If you don't require the server to
handle many
+ concurrent connections, you might get slightly better performance
by using
+ old (blocking) IO. The default value for this property is
<literal
+ >false</literal> on the server side and
<literal>false</literal> on the
+ client side.</para>
</listitem>
<listitem>
<para><literal>host</literal>. This specifies the
host name or IP address to
@@ -233,8 +236,8 @@
is <literal>true</literal>.</para>
</listitem>
<listitem>
- <para><literal>tcp-send-buffer-size</literal>. This
parameter determines the size
- of the TCP send buffer in bytes. The default value for this
property is
+ <para><literal>tcp-send-buffer-size</literal>. This
parameter determines the
+ size of the TCP send buffer in bytes. The default value for this
property is
<literal>32768</literal> bytes
(32KiB).</para>
<para>TCP buffer sizes should be tuned according to the
bandwidth and latency of
your network. Here's a good link that explains the theory
behind <ulink
@@ -254,6 +257,37 @@
size of the TCP receive buffer in bytes. The default value for
this property
is <literal>32768</literal> bytes
(32KiB).</para>
</listitem>
+ <listitem>
+ <para><literal>batch-delay</literal>. Before
writing packets to the transport,
+ HornetQ can be configured to batch up writes for a maximum of
<literal
+ >batch-delay</literal> milliseconds. This can
increase overall
+ throughput for very small messages. It does so at the expense of
an increase
+ in average latency for message transfer. The default value for
this property
+ is <literal>0</literal> ms.</para>
+ </listitem>
+ <listitem>
+ <para><literal>direct-deliver</literal>. When a
message arrives on the server
+ and is delivered to waiting consumers, by default, the delivery
is done on a
+ different thread to that which the message arrived on. This gives
the best
+ overall throughput and scalability, especially on multi-core
machines.
+ However it also introduces some extra latency due to the extra
context
+ switch required. If you want the lowest latency and the possible
expense of
+ some reduction in throughput then you can make sure <literal
+ >direct-deliver</literal> to true. The default value
for this parameter
+ is <literal>true</literal>. If you are willingh to
take some small extra hit
+ on latency but want the highest throughput set this parameter to
<literal
+ >false</literal>.</para>
+ </listitem>
+ <listitem>
+ <para><literal>nio-remoting-threads</literal>. When
configured to use NIO,
+ HornetQ will, by default, use a number of threads equal to three
times the
+ number of cores (or hyper-threads) as reported by <literal
+
>Runtime.getRuntime().availableProcessors()</literal> for processing
+ incoming packets. If you want to override this value, you can set
the number
+ of threads by specifying this parameter. The default value for
this
+ parameter is <literal>-1</literal> which means use
the value from <literal
+
>Runtime.getRuntime().availableProcessors()</literal> * 3.</para>
+ </listitem>
</itemizedlist>
</section>
<section>
@@ -269,20 +303,20 @@
SSL.</para>
</listitem>
<listitem>
- <para><literal>key-store-path</literal>. This is
the path to the SSL key store on
- the client which holds the client certificates.</para>
+ <para><literal>key-store-path</literal>. This is
the path to the SSL key store
+ on the client which holds the client certificates.</para>
</listitem>
<listitem>
<para><literal>key-store-password</literal>. This
is the password for the client
certificate key store on the client.</para>
</listitem>
<listitem>
- <para><literal>trust-store-path</literal>. This is
the path to the trusted client
- certificate store on the server.</para>
+ <para><literal>trust-store-path</literal>. This is
the path to the trusted
+ client certificate store on the server.</para>
</listitem>
<listitem>
- <para><literal>trust-store-password</literal>. This
is the password to the trusted
- client certificate store on the server.</para>
+ <para><literal>trust-store-password</literal>. This
is the password to the
+ trusted client certificate store on the server.</para>
</listitem>
</itemizedlist>
</section>
@@ -303,8 +337,8 @@
before sending an empty http request to keep the connection
alive</para>
</listitem>
<listitem>
-
<para><literal>http-client-idle-scan-period</literal>. How often, in
milliseconds,
- to scan for idle clients</para>
+
<para><literal>http-client-idle-scan-period</literal>. How often, in
+ milliseconds, to scan for idle clients</para>
</listitem>
<listitem>
<para><literal>http-response-time</literal>. How
long the server can wait before
Modified: trunk/docs/user-manual/en/flow-control.xml
===================================================================
--- trunk/docs/user-manual/en/flow-control.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/docs/user-manual/en/flow-control.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -261,10 +261,12 @@
100000 bytes and would block any producers sending to that address to
prevent that
max size being exceeded.</para>
<para>Note the policy must be set to
<literal>BLOCK</literal> to enable blocking producer
- flow control.</para>
- <para>Please note the default value for
<literal>address-full-policy</literal> is to
- <literal>PAGE</literal> (see <xref
linkend="paging" /> for more information on
- paging).</para>
+ flow control.</para>
+ <note><para>Note that in the default configuration all addresses
are set to block producers after 10 MiB of message data
+ is in the address. This means you cannot send more than 10MiB of message data
to an address without it being consumed before the producers
+ will be blocked. If you do not want this behaviour increase the
<literal>max-size-bytes</literal> parameter or change the
+ address full message policy.</para>
+ </note>
</section>
</section>
<section>
Modified: trunk/docs/user-manual/en/ha.xml
===================================================================
--- trunk/docs/user-manual/en/ha.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/docs/user-manual/en/ha.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -135,7 +135,9 @@
<section id="ha.mode.shared">
<title>Shared Store</title>
<para>When using a shared store, both live and backup servers share
the
- <emphasis>same</emphasis> journal using a shared file
system. </para>
+ <emphasis>same</emphasis> entire data directory using
a shared file system.
+ This means the paging directory, journal directory, large messages
and binding
+ journal.</para>
<para>When failover occurs and the backup server takes over, it
will load the
persistent storage from the shared file system and clients can
connect to
it.</para>
Modified: trunk/docs/user-manual/en/perf-tuning.xml
===================================================================
--- trunk/docs/user-manual/en/perf-tuning.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/docs/user-manual/en/perf-tuning.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -99,6 +99,11 @@
really need durable messages then set them to be non-durable. Durable
messages
incur a lot more overhead in persisting them to
storage.</para>
</listitem>
+ <listitem>
+ <para>Batch many sends or acknowledgements in a single transaction.
HornetQ will
+ only require a network round trip on the commit, not on every send
or
+ acknowledgement.</para>
+ </listitem>
</itemizedlist>
</section>
<section>
@@ -154,17 +159,18 @@
for more information.</para>
</listitem>
<listitem>
- <para>If you have very fast consumers, you can increase
consumer-window-size. This effectively disables consumer flow control.</para>
+ <para>If you have very fast consumers, you can increase
consumer-window-size. This
+ effectively disables consumer flow control.</para>
</listitem>
<listitem>
- <para>Socket NIO vs Socket Old IO. By default HornetQ uses Socket
NIO on the server
- and old (blocking) IO on the client side (see the chapter on
configuring
- transports for more information <xref
linkend="configuring-transports"/>). NIO
- is much more scalable but can give you some latency hit compared to
old blocking
- IO. If you expect to be able to service many thousands of connections
on the
- server, then continue to use NIO on the server. However, if don't
expect many
- thousands of connections on the server you can configure the server
acceptors to
- use old IO, and might get a small performance
advantage.</para>
+ <para>Socket NIO vs Socket Old IO. By default HornetQ uses old
(blocking) on the
+ server and the client side (see the chapter on configuring transports
for more
+ information <xref
linkend="configuring-transports"/>). NIO is much more scalable
+ but can give you some latency hit compared to old blocking IO. If you
need to be
+ able to service many thousands of connections on the server, then you
should
+ make sure you're using NIO on the server. However, if don't
expect many
+ thousands of connections on the server you can keep the server
acceptors using
+ old IO, and might get a small performance advantage.</para>
</listitem>
<listitem>
<para>Use the core API not JMS. Using the JMS API you will have
slightly lower
@@ -180,7 +186,7 @@
</section>
<section>
<title>Tuning Transport Settings</title>
- <itemizedlist>
+ <itemizedlist>
<listitem>
<para>TCP buffer sizes. If you have a fast network and fast
machines you may get a
performance boost by increasing the TCP send and receive buffer
sizes. See the
@@ -202,13 +208,23 @@
This would allow up to 20000 file handles to be open by the user
<literal
serveruser</literal>. </para>
</listitem>
+ <listitem>
+ <para>Use <literal>batch-delay</literal> and set
<literal>direct-deliver</literal>
+ to false for the best throughput for very small messages. HornetQ
comes with a
+ preconfigured connector/acceptor pair
(<literal>netty-throughput</literal>) in
+ <literal>hornetq-configuration.xml</literal> and JMS
connection factory
+ (<literal>ThroughputConnectionFactory</literal>) in
<literal
+ >hornetq-jms.xml</literal>which can be used to give the
very best
+ throughput, especially for small messages. See the <xref
+ linkend="configuring-transports"/> for more
information on this.</para>
+ </listitem>
</itemizedlist>
</section>
<section>
<title>Tuning the VM</title>
- <para>We highly recommend you use the latest Java JVM for the best
performance. We test internally using the
- Sun JVM, so some of these tunings won't apply to JDKs from other
providers (e.g. IBM or
- JRockit)</para>
+ <para>We highly recommend you use the latest Java JVM for the best
performance. We test
+ internally using the Sun JVM, so some of these tunings won't apply to
JDKs from other
+ providers (e.g. IBM or JRockit)</para>
<itemizedlist>
<listitem>
<para>Garbage collection. For smooth server operation we recommend
using a parallel
@@ -223,15 +239,6 @@
size and number of your messages. Use the JVM arguments
<literal>-Xms</literal>
and <literal>-Xmx</literal> to set server available RAM.
We recommend setting
them to the same high value.</para>
- <para>HornetQ will regularly sample JVM memory and reports if the
available memory
- is below a configurable threshold. Use this information to properly
set JVM
- memory and paging. The sample is disabled by default. To enabled it,
configure
- the sample frequency by setting
<literal>memory-measure-interval</literal> in
- <literal>hornetq-configuration.xml</literal> (in
milliseconds). When the
- available memory goes below the configured threshold, a warning is
logged. The
- threshold can be also configured by setting <literal
- >memory-warning-threshold</literal> in <literal
- >hornetq-configuration.xml</literal> (default is
25%).</para>
</listitem>
<listitem>
<para>Aggressive options. Different JVMs provide different sets of
JVM tuning
@@ -257,7 +264,10 @@
<note>
<para>Some popular libraries such as the Spring JMS Template
are known to use
these anti-patterns. If you're using Spring JMS Template and
you're getting
- poor performance you know why. Don't blame
HornetQ!</para>
+ poor performance you know why. Don't blame HornetQ! The
Spring JMS Template
+ can only safely be used in an app server which caches JMS
sessions (e.g.
+ using JCA), and only then for sending messages. It cannot be
safely be used
+ for synchronously consuming messages, even in an app server.
</para>
</note>
</listitem>
<listitem>
Modified: trunk/docs/user-manual/en/thread-pooling.xml
===================================================================
--- trunk/docs/user-manual/en/thread-pooling.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/docs/user-manual/en/thread-pooling.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -28,6 +28,14 @@
thread pool for scheduled use. A Java scheduled thread pool cannot be
configured to use
a standard thread pool, otherwise we could use a single thread pool for both
scheduled
and non scheduled activity.</para>
+ <para>When using old (blocking) IO, a separate thread pool is also used to
service connections. Since old IO requires a thread per connection
+ it does not make sense to get them from the standard pool as the pool will easily
get exhausted if too many connections are made, resulting in the
+ server "hanging" since it has no remaining threads to do anything
else.</para>
+ <para>When using new IO (NIO), HornetQ will, by default, use a number of
threads equal to three times the number of cores (or hyper-threads)
+ as reported by Runtime.getRuntime().availableProcessors() for processing
incoming packets.
+ If you want to override this value, you can set the number of threads by
specifying the parameter <literal>nio-remoting-threads</literal>
+ in the transport configuration. See the
+ <xref linkend="configuring-transports"/> for more information
on this.</para>
<para>There are also a small number of other places where threads are used
directly, we'll
discuss each in turn.</para>
<section id="server.scheduled.thread.pool">
@@ -60,7 +68,7 @@
bounded thread pool is used with caution since it can lead to dead-lock
situations
if the upper bound is chosen to be too low.</para>
<para>The default value for
<literal>thread-pool-max-size</literal> is <literal
- >-1</literal>, i.e. the thread pool is
unbounded.</para>
+ >30</literal>.</para>
<para>See the <ulink
url="http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/Th...
J2SE javadoc</ulink> for more information on unbounded (cached),
and bounded
Modified: trunk/src/config/jboss-as-4/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-4/clustered/hornetq-configuration.xml 2010-05-17 17:39:52
UTC (rev 9241)
+++ trunk/src/config/jboss-as-4/clustered/hornetq-configuration.xml 2010-05-18 12:00:15
UTC (rev 9242)
@@ -40,7 +40,7 @@
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host"
value="${jboss.bind.address:localhost}"/>
<param key="port"
value="${hornetq.remoting.netty.batch.port:5455}"/>
- <param key="batch-delay" value="50"/>
+ <param key="batch-delay" value="50"/>
</connector>
<connector name="in-vm">
@@ -62,6 +62,7 @@
<param key="host"
value="${jboss.bind.address:localhost}"/>
<param key="port"
value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
<acceptor name="in-vm">
Modified: trunk/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml 2010-05-17
17:39:52 UTC (rev 9241)
+++ trunk/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml 2010-05-18
12:00:15 UTC (rev 9242)
@@ -60,6 +60,7 @@
<param key="host"
value="${jboss.bind.address:localhost}"/>
<param key="port"
value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
<acceptor name="in-vm">
Modified: trunk/src/config/jboss-as-5/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-5/clustered/hornetq-configuration.xml 2010-05-17 17:39:52
UTC (rev 9241)
+++ trunk/src/config/jboss-as-5/clustered/hornetq-configuration.xml 2010-05-18 12:00:15
UTC (rev 9242)
@@ -62,6 +62,7 @@
<param key="host"
value="${jboss.bind.address:localhost}"/>
<param key="port"
value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
<acceptor name="in-vm">
Modified: trunk/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml 2010-05-17
17:39:52 UTC (rev 9241)
+++ trunk/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml 2010-05-18
12:00:15 UTC (rev 9242)
@@ -60,6 +60,7 @@
<param key="host"
value="${jboss.bind.address:localhost}"/>
<param key="port"
value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
<acceptor name="in-vm">
Modified: trunk/src/config/jboss-as-6/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-6/clustered/hornetq-configuration.xml 2010-05-17 17:39:52
UTC (rev 9241)
+++ trunk/src/config/jboss-as-6/clustered/hornetq-configuration.xml 2010-05-18 12:00:15
UTC (rev 9242)
@@ -62,6 +62,7 @@
<param key="host"
value="${jboss.bind.address:localhost}"/>
<param key="port"
value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
<acceptor name="in-vm">
Modified: trunk/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml 2010-05-17
17:39:52 UTC (rev 9241)
+++ trunk/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml 2010-05-18
12:00:15 UTC (rev 9242)
@@ -60,6 +60,7 @@
<param key="host"
value="${jboss.bind.address:localhost}"/>
<param key="port"
value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
<acceptor name="in-vm">
Modified: trunk/src/config/stand-alone/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/clustered/hornetq-configuration.xml 2010-05-17 17:39:52
UTC (rev 9241)
+++ trunk/src/config/stand-alone/clustered/hornetq-configuration.xml 2010-05-18 12:00:15
UTC (rev 9242)
@@ -41,6 +41,7 @@
<param key="host"
value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port"
value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
</acceptors>
Modified: trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml 2010-05-17
17:39:52 UTC (rev 9241)
+++ trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml 2010-05-18
12:00:15 UTC (rev 9242)
@@ -39,6 +39,7 @@
<param key="host"
value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port"
value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
</acceptors>
Modified: trunk/src/config/trunk/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/trunk/clustered/hornetq-configuration.xml 2010-05-17 17:39:52 UTC
(rev 9241)
+++ trunk/src/config/trunk/clustered/hornetq-configuration.xml 2010-05-18 12:00:15 UTC
(rev 9242)
@@ -33,6 +33,7 @@
<param key="host"
value="${jboss.bind.address:localhost}"/>
<param key="port"
value="${hornetq.remoting.netty.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
</acceptors>
Modified: trunk/src/config/trunk/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/trunk/non-clustered/hornetq-configuration.xml 2010-05-17 17:39:52 UTC
(rev 9241)
+++ trunk/src/config/trunk/non-clustered/hornetq-configuration.xml 2010-05-18 12:00:15 UTC
(rev 9242)
@@ -31,6 +31,7 @@
<param key="host"
value="${jboss.bind.address:localhost}"/>
<param key="port"
value="${hornetq.remoting.netty.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
</acceptors>
Modified: trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-05-17
17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-05-18
12:00:15 UTC (rev 9242)
@@ -838,6 +838,7 @@
for (int i = 0; i < paramsNodes.getLength(); i++)
{
Node paramNode = paramsNodes.item(i);
+
NamedNodeMap attributes = paramNode.getAttributes();
Node nkey = attributes.getNamedItem("key");
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-05-17 17:39:52
UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-05-18 12:00:15
UTC (rev 9242)
@@ -985,7 +985,7 @@
}
- postOffice.route(message, depageTransaction);
+ postOffice.route(message, depageTransaction, false);
// This means the page is duplicated. So we need to ignore this
if (depageTransaction.getState() == State.ROLLBACK_ONLY)
Modified: trunk/src/main/org/hornetq/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/PostOffice.java 2010-05-17 17:39:52 UTC
(rev 9241)
+++ trunk/src/main/org/hornetq/core/postoffice/PostOffice.java 2010-05-18 12:00:15 UTC
(rev 9242)
@@ -49,11 +49,11 @@
Bindings getMatchingBindings(SimpleString address);
- void route(ServerMessage message) throws Exception;
+ void route(ServerMessage message, boolean direct) throws Exception;
- void route(ServerMessage message, Transaction tx) throws Exception;
+ void route(ServerMessage message, Transaction tx, boolean direct) throws Exception;
- void route(ServerMessage message, RoutingContext context) throws Exception;
+ void route(ServerMessage message, RoutingContext context, boolean direct) throws
Exception;
MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws
Exception;
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-05-17
17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-05-18
12:00:15 UTC (rev 9242)
@@ -64,7 +64,6 @@
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
-import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUIDGenerator;
@@ -529,17 +528,17 @@
return addressManager.getMatchingBindings(address);
}
- public void route(final ServerMessage message) throws Exception
+ public void route(final ServerMessage message, final boolean direct) throws Exception
{
- route(message, (Transaction)null);
+ route(message, (Transaction)null, direct);
}
- public void route(final ServerMessage message, final Transaction tx) throws Exception
+ public void route(final ServerMessage message, final Transaction tx, final boolean
direct) throws Exception
{
- this.route(message, new RoutingContextImpl(tx));
+ route(message, new RoutingContextImpl(tx), direct);
}
- public void route(final ServerMessage message, final RoutingContext context) throws
Exception
+ public void route(final ServerMessage message, final RoutingContext context, final
boolean direct) throws Exception
{
// Sanity check
if (message.getRefCount() > 0)
@@ -656,13 +655,13 @@
message.setAddress(dlaAddress);
- route(message, context.getTransaction());
+ route(message, context.getTransaction(), false);
}
}
}
else
{
- processRoute(message, context);
+ processRoute(message, context, direct);
}
if (startedTx)
@@ -689,7 +688,7 @@
if (tx == null)
{
- queue.addLast(reference);
+ queue.addLast(reference, false);
}
else
{
@@ -717,7 +716,7 @@
if (routed)
{
- processRoute(message, context);
+ processRoute(message, context, false);
res = true;
}
@@ -778,7 +777,7 @@
message.setAddress(queueName);
message.putBooleanProperty(PostOfficeImpl.HDR_RESET_QUEUE_DATA, true);
- routeDirect(message, queue, false);
+ routeQueueInfo(message, queue, false);
for (QueueInfo info : queueInfos.values())
{
@@ -793,7 +792,7 @@
message.putStringProperty(ManagementHelper.HDR_FILTERSTRING,
info.getFilterString());
message.putIntProperty(ManagementHelper.HDR_DISTANCE,
info.getDistance());
- routeDirect(message, queue, true);
+ routeQueueInfo(message, queue, true);
int consumersWithFilters = info.getFilterStrings() != null ?
info.getFilterStrings().size() : 0;
@@ -806,7 +805,7 @@
message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME,
info.getRoutingName());
message.putIntProperty(ManagementHelper.HDR_DISTANCE,
info.getDistance());
- routeDirect(message, queue, true);
+ routeQueueInfo(message, queue, true);
}
if (info.getFilterStrings() != null)
@@ -821,7 +820,7 @@
message.putStringProperty(ManagementHelper.HDR_FILTERSTRING,
filterString);
message.putIntProperty(ManagementHelper.HDR_DISTANCE,
info.getDistance());
- routeDirect(message, queue, true);
+ routeQueueInfo(message, queue, true);
}
}
}
@@ -838,7 +837,7 @@
message.setPagingStore(store);
}
- private void routeDirect(final ServerMessage message, final Queue queue, final boolean
applyFilters) throws Exception
+ private void routeQueueInfo(final ServerMessage message, final Queue queue, final
boolean applyFilters) throws Exception
{
if (!applyFilters || queue.getFilter() == null ||
queue.getFilter().match(message))
{
@@ -846,11 +845,11 @@
queue.route(message, context);
- processRoute(message, context);
+ processRoute(message, context, false);
}
}
- private void processRoute(final ServerMessage message, final RoutingContext context)
throws Exception
+ private void processRoute(final ServerMessage message, final RoutingContext context,
final boolean direct) throws Exception
{
final List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -951,7 +950,7 @@
public void done()
{
- addReferences(refs);
+ addReferences(refs, direct);
}
});
}
@@ -960,11 +959,11 @@
/**
* @param refs
*/
- private void addReferences(final List<MessageReference> refs)
+ private void addReferences(final List<MessageReference> refs, final boolean
direct)
{
for (MessageReference ref : refs)
{
- ref.getQueue().addLast(ref);
+ ref.getQueue().addLast(ref, direct);
}
}
@@ -985,7 +984,6 @@
ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(),
50);
message.setAddress(queueName);
- // message.setDurable(true);
String uid = UUIDGenerator.getInstance().generateStringUUID();
@@ -1199,7 +1197,7 @@
// This could happen when the PageStore left the pageState
// TODO is this correct - don't we lose transactionality here???
- route(message);
+ route(message, false);
}
first = false;
}
@@ -1235,7 +1233,7 @@
{
for (MessageReference ref : refs)
{
- ref.getQueue().addLast(ref);
+ ref.getQueue().addLast(ref, false);
}
}
Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-05-17
17:39:52 UTC (rev 9241)
+++
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-05-18
12:00:15 UTC (rev 9242)
@@ -92,10 +92,12 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAStartMessage;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.remoting.impl.netty.NettyConnection;
import org.hornetq.core.server.BindingQueryResult;
import org.hornetq.core.server.QueueQueryResult;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.spi.core.remoting.Connection;
/**
* A ServerSessionPacketHandler
@@ -119,6 +121,8 @@
private final Channel channel;
private volatile CoreRemotingConnection remotingConnection;
+
+ private final boolean direct;
public ServerSessionPacketHandler(final ServerSession session,
final OperationContext sessionContext,
@@ -134,7 +138,19 @@
this.channel = channel;
this.remotingConnection = channel.getConnection();
-
+
+ //TODO think of a better way of doing this
+ Connection conn = remotingConnection.getTransportConnection();
+
+ if (conn instanceof NettyConnection)
+ {
+ direct = ((NettyConnection)conn).isDirectDeliver();
+ }
+ else
+ {
+ direct = false;
+ }
+
addConnectionListeners();
}
@@ -442,7 +458,7 @@
{
SessionSendMessage message = (SessionSendMessage)packet;
requiresResponse = message.isRequiresResponse();
- session.send((ServerMessage)message.getMessage());
+ session.send((ServerMessage)message.getMessage(), direct);
if (requiresResponse)
{
response = new NullResponseMessage();
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-05-17
17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-05-18
12:00:15 UTC (rev 9242)
@@ -535,7 +535,7 @@
{
message.putStringProperty(CONNECTION_ID_PROP, connection.getID().toString());
}
- stompSession.getSession().send(message);
+ stompSession.getSession().send(message, true);
return null;
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2010-05-17
17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2010-05-18
12:00:15 UTC (rev 9242)
@@ -140,7 +140,7 @@
private final ConcurrentMap<Object, Connection> connections = new
ConcurrentHashMap<Object, Connection>();
private final Executor threadPool;
-
+
private final ScheduledExecutorService scheduledThreadPool;
private NotificationService notificationService;
@@ -148,13 +148,15 @@
private VirtualExecutorService bossExecutor;
private boolean paused;
-
+
private BatchFlusher flusher;
-
+
private ScheduledFuture<?> batchFlusherFuture;
-
+
private final long batchDelay;
+ private final boolean directDeliver;
+
public NettyAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
final BufferDecoder decoder,
@@ -253,12 +255,16 @@
configuration);
this.threadPool = threadPool;
-
+
this.scheduledThreadPool = scheduledThreadPool;
-
+
batchDelay = ConfigurationHelper.getLongProperty(TransportConstants.BATCH_DELAY,
TransportConstants.DEFAULT_BATCH_DELAY,
configuration);
+
+ directDeliver =
ConfigurationHelper.getBooleanProperty(TransportConstants.DIRECT_DELIVER,
+
TransportConstants.DEFAULT_DIRECT_DELIVER,
+ configuration);
}
public synchronized void start() throws Exception
@@ -418,15 +424,25 @@
Notification notification = new Notification(null,
NotificationType.ACCEPTOR_STARTED, props);
notificationService.sendNotification(notification);
}
-
+
if (batchDelay > 0)
{
flusher = new BatchFlusher();
-
- batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher,
batchDelay, batchDelay, TimeUnit.MILLISECONDS);
+
+ batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher,
+ batchDelay,
+ batchDelay,
+
TimeUnit.MILLISECONDS);
}
- NettyAcceptor.log.info("Started Netty Acceptor version " + Version.ID +
" " + host + ":" + port + " for " + protocol + "
protocol");
+ NettyAcceptor.log.info("Started Netty Acceptor version " + Version.ID +
+ " " +
+ host +
+ ":" +
+ port +
+ " for " +
+ protocol +
+ " protocol");
}
private void startServerChannels()
@@ -454,15 +470,15 @@
{
return;
}
-
+
if (batchFlusherFuture != null)
{
batchFlusherFuture.cancel(false);
-
+
flusher.cancel();
-
+
flusher = null;
-
+
batchFlusherFuture = null;
}
@@ -589,7 +605,7 @@
@Override
public void channelConnected(final ChannelHandlerContext ctx, final
ChannelStateEvent e) throws Exception
{
- new NettyConnection(e.getChannel(), new Listener(), !httpEnabled &&
batchDelay > 0);
+ new NettyConnection(e.getChannel(), new Listener(), !httpEnabled &&
batchDelay > 0, directDeliver);
SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
if (sslHandler != null)
@@ -650,7 +666,7 @@
}
}
-
+
private class BatchFlusher implements Runnable
{
private boolean cancelled;
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2010-05-17
17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2010-05-18
12:00:15 UTC (rev 9242)
@@ -50,6 +50,8 @@
private final boolean batchingEnabled;
+ private final boolean directDeliver;
+
private HornetQBuffer batchBuffer;
private final Object writeLock = new Object();
@@ -58,7 +60,7 @@
// Constructors --------------------------------------------------
- public NettyConnection(final Channel channel, final ConnectionLifeCycleListener
listener, boolean batchingEnabled)
+ public NettyConnection(final Channel channel, final ConnectionLifeCycleListener
listener, boolean batchingEnabled, boolean directDeliver)
{
this.channel = channel;
@@ -66,6 +68,8 @@
this.batchingEnabled = batchingEnabled;
+ this.directDeliver = directDeliver;
+
listener.connectionCreated(this, ProtocolType.CORE);
}
@@ -211,6 +215,11 @@
{
return channel.getRemoteAddress().toString();
}
+
+ public boolean isDirectDeliver()
+ {
+ return directDeliver;
+ }
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java 2010-05-17
17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java 2010-05-18
12:00:15 UTC (rev 9242)
@@ -470,7 +470,7 @@
ch.getPipeline().get(HornetQChannelHandler.class).active = true;
}
- NettyConnection conn = new NettyConnection(ch, new Listener(), !httpEnabled
&& batchDelay > 0);
+ NettyConnection conn = new NettyConnection(ch, new Listener(), !httpEnabled
&& batchDelay > 0, false);
return conn;
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java 2010-05-17
17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java 2010-05-18
12:00:15 UTC (rev 9242)
@@ -72,6 +72,8 @@
public static final String BATCH_DELAY = "batch-delay";
+ public static final String DIRECT_DELIVER = "direct-deliver";
+
public static final boolean DEFAULT_SSL_ENABLED = false;
public static final boolean DEFAULT_USE_NIO_SERVER = false;
@@ -120,6 +122,8 @@
public static final String DEFAULT_SERVLET_PATH =
"/messaging/HornetQServlet";
public static final long DEFAULT_BATCH_DELAY = 0;
+
+ public static final boolean DEFAULT_DIRECT_DELIVER = true;
public static final Set<String> ALLOWABLE_CONNECTOR_KEYS;
@@ -146,6 +150,7 @@
allowableAcceptorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME);
allowableAcceptorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY);
+ allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER);
ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -48,9 +48,11 @@
void removeConsumer(Consumer consumer) throws Exception;
int getConsumerCount();
-
+
void addLast(MessageReference ref);
+ void addLast(MessageReference ref, boolean direct);
+
void addFirst(MessageReference ref);
void acknowledge(MessageReference ref) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-05-17 17:39:52 UTC (rev
9241)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-05-18 12:00:15 UTC (rev
9242)
@@ -99,7 +99,7 @@
void sendContinuations(int packetSize, byte[] body, boolean continues) throws
Exception;
- void send(ServerMessage message) throws Exception;
+ void send(ServerMessage message, boolean direct) throws Exception;
void sendLarge(byte[] largeMessageHeader) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-05-17
17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-05-18
12:00:15 UTC (rev 9242)
@@ -17,6 +17,7 @@
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -25,7 +26,13 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.SendAcknowledgementHandler;
+import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.api.core.management.ResourceNames;
@@ -81,7 +88,7 @@
private final SimpleString forwardingAddress;
- private final java.util.Queue<MessageReference> refs = new
LinkedList<MessageReference>();
+ private final java.util.Queue<MessageReference> refs = new
ConcurrentLinkedQueue<MessageReference>();
private final Transformer transformer;
Modified: trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java 2010-05-17 17:39:52 UTC
(rev 9241)
+++ trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java 2010-05-18 12:00:15 UTC
(rev 9242)
@@ -86,6 +86,7 @@
// TODO we can optimise this so it doesn't copy if it's not routed anywhere
else
long id = storageManager.generateUniqueID();
+
ServerMessage copy = message.copy(id);
// This will set the original MessageId, and the original address
@@ -98,7 +99,7 @@
copy = transformer.transform(copy);
}
- postOffice.route(copy, context.getTransaction());
+ postOffice.route(copy, context.getTransaction(), false);
}
public SimpleString getRoutingName()
Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-05-17 17:39:52
UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-05-18 12:00:15
UTC (rev 9242)
@@ -71,7 +71,7 @@
}
@Override
- public synchronized void add(final MessageReference ref, final boolean first)
+ public synchronized void add(final MessageReference ref, final boolean first, final
boolean direct)
{
SimpleString prop =
ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
@@ -107,7 +107,7 @@
map.put(prop, hr);
- super.add(hr, first);
+ super.add(hr, first, direct);
}
}
else
@@ -133,13 +133,13 @@
{
map.put(prop, (HolderReference)ref);
- super.add(ref, first);
+ super.add(ref, first, direct);
}
}
}
else
{
- super.add(ref, first);
+ super.add(ref, first, direct);
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-05-17 17:39:52 UTC
(rev 9241)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-05-18 12:00:15 UTC
(rev 9242)
@@ -82,7 +82,8 @@
private final PostOffice postOffice;
- private final PriorityLinkedList<MessageReference> messageReferences = new
PriorityLinkedListImpl<MessageReference>(true, QueueImpl.NUM_PRIORITIES);
+ private final PriorityLinkedList<MessageReference> messageReferences = new
PriorityLinkedListImpl<MessageReference>(true,
+
QueueImpl.NUM_PRIORITIES);
private final List<ConsumerHolder> consumerList = new
ArrayList<ConsumerHolder>();
@@ -228,17 +229,22 @@
{
return filter;
}
-
+
public void addLast(final MessageReference ref)
{
+ addLast(ref, false);
+ }
+
+ public void addLast(final MessageReference ref, final boolean direct)
+ {
messagesAdded.incrementAndGet();
-
- add(ref, false);
+
+ add(ref, false, direct);
}
public void addFirst(final MessageReference ref)
{
- add(ref, true);
+ add(ref, true, false);
}
public void deliverAsync()
@@ -268,19 +274,19 @@
public synchronized void removeConsumer(final Consumer consumer) throws Exception
{
Iterator<ConsumerHolder> iter = consumerList.iterator();
-
+
while (iter.hasNext())
{
ConsumerHolder holder = iter.next();
-
+
if (holder.consumer == consumer)
{
iter.remove();
-
+
break;
}
}
-
+
if (pos > 0 && pos >= consumerList.size())
{
pos = consumerList.size() - 1;
@@ -297,7 +303,7 @@
gids.add(groupID);
}
}
-
+
for (SimpleString gid : gids)
{
groups.remove(gid);
@@ -345,19 +351,19 @@
redistributor = null;
Iterator<ConsumerHolder> iter = consumerList.iterator();
-
+
while (iter.hasNext())
{
ConsumerHolder holder = iter.next();
-
+
if (holder.consumer == redistributor)
{
iter.remove();
-
+
break;
}
}
-
+
if (pos > 0 && pos >= consumerList.size())
{
pos = consumerList.size() - 1;
@@ -854,7 +860,7 @@
{
iter.remove();
ref.getMessage().setPriority(newPriority);
- addLast(ref);
+ addLast(ref, false);
return true;
}
}
@@ -875,7 +881,7 @@
count++;
iter.remove();
ref.getMessage().setPriority(newPriority);
- addLast(ref);
+ addLast(ref, false);
}
}
return count;
@@ -981,7 +987,7 @@
copyMessage.setAddress(toAddress);
- postOffice.route(copyMessage, tx);
+ postOffice.route(copyMessage, tx, false);
acknowledge(tx, ref);
}
@@ -1070,7 +1076,7 @@
copyMessage.setAddress(address);
- postOffice.route(copyMessage, tx);
+ postOffice.route(copyMessage, tx, false);
acknowledge(tx, ref);
@@ -1083,15 +1089,7 @@
{
return;
}
-
- // Disadvantage of this algorithm is that if there are many consumers which are
busy a lot of the
- // time, then they get tried with a message each time, and the message put back on
the queue, which
- // is inefficient
- // This represents the number of consumers that are unavailable to take a message
due either to
- // there not being any messages available for its iterator/in queue or it's
busy
- // int unavailableCount = 0;
-
int busyCount = 0;
int nullRefCount = 0;
@@ -1099,10 +1097,10 @@
int size = consumerList.size();
int startPos = pos;
-
+
// Deliver at most 1000 messages in one go, to prevent tying this thread up for too
long
int loop = Math.min(messageReferences.size(), 1000);
-
+
for (int i = 0; i < loop; i++)
{
ConsumerHolder holder = consumerList.get(pos);
@@ -1119,7 +1117,7 @@
{
ref = holder.iter.next();
}
-
+
if (ref == null)
{
nullRefCount++;
@@ -1137,21 +1135,21 @@
}
Consumer groupConsumer = null;
-
- //If a group id is set, then this overrides the consumer chosen round-robin
-
+
+ // If a group id is set, then this overrides the consumer chosen round-robin
+
SimpleString groupID =
ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
if (groupID != null)
{
groupConsumer = groups.get(groupID);
-
+
if (groupConsumer != null)
{
consumer = groupConsumer;
}
}
-
+
HandleStatus status = handle(ref, consumer);
if (status == HandleStatus.HANDLED)
@@ -1160,7 +1158,7 @@
{
holder.iter.remove();
}
-
+
if (groupID != null && groupConsumer == null)
{
groups.put(groupID, consumer);
@@ -1186,14 +1184,14 @@
messageReferences.addFirst(ref, ref.getMessage().getPriority());
holder.iter = messageReferences.iterator();
-
- //Skip past the one we just put back
-
+
+ // Skip past the one we just put back
+
holder.iter.next();
}
}
}
-
+
pos++;
if (pos == size)
@@ -1224,6 +1222,75 @@
}
+ /*
+ * This method delivers the reference on the callers thread - this can give us better
latency in the case there is nothing in the queue
+ */
+ private synchronized boolean deliverDirect(final MessageReference ref)
+ {
+ if (paused || consumerList.isEmpty())
+ {
+ return false;
+ }
+
+ if (checkExpired(ref))
+ {
+ return true;
+ }
+
+ int startPos = pos;
+
+ int size = consumerList.size();
+
+ while (true)
+ {
+ ConsumerHolder holder = consumerList.get(pos);
+
+ Consumer consumer = holder.consumer;
+
+ Consumer groupConsumer = null;
+
+ // If a group id is set, then this overrides the consumer chosen round-robin
+
+ SimpleString groupID =
ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+
+ if (groupID != null)
+ {
+ groupConsumer = groups.get(groupID);
+
+ if (groupConsumer != null)
+ {
+ consumer = groupConsumer;
+ }
+ }
+
+ pos++;
+
+ if (pos == size)
+ {
+ pos = 0;
+ }
+
+ HandleStatus status = handle(ref, consumer);
+
+ if (status == HandleStatus.HANDLED)
+ {
+ if (groupID != null && groupConsumer == null)
+ {
+ groups.put(groupID, consumer);
+ }
+
+ return true;
+ }
+
+ if (pos == startPos)
+ {
+ // Tried them all
+
+ return false;
+ }
+ }
+ }
+
private boolean checkExpired(final MessageReference reference)
{
if (reference.getMessage().isExpired())
@@ -1247,15 +1314,23 @@
}
}
- protected void add(final MessageReference ref, final boolean first)
+ protected void add(final MessageReference ref, final boolean first, final boolean
direct)
{
if (scheduledDeliveryHandler.checkAndSchedule(ref))
{
return;
}
+ if (direct && messageReferences.isEmpty())
+ {
+ if (deliverDirect(ref))
+ {
+ return;
+ }
+ }
+
int refs;
-
+
if (first)
{
refs = messageReferences.addFirst(ref, ref.getMessage().getPriority());
@@ -1358,7 +1433,7 @@
{
for (MessageReference ref : refs)
{
- add(ref, true);
+ add(ref, true, false);
}
deliverAsync();
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-05-17 17:39:52
UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-05-18 12:00:15
UTC (rev 9242)
@@ -360,25 +360,7 @@
// dies. It does not mean it will get deleted automatically when the
// session is closed.
// It is up to the user to delete the queue when finished with it
-
- CloseListener closeListener = new CloseListener()
- {
- public void connectionClosed()
- {
- try
- {
- if (postOffice.getBinding(name) != null)
- {
- postOffice.removeBinding(name);
- }
- }
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to remove temporary queue
" + name);
- }
- }
- };
-
+
TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(postOffice, name);
remotingConnection.addCloseListener(cleaner);
@@ -988,7 +970,7 @@
currentLargeMessage = msg;
}
- public void send(final ServerMessage message) throws Exception
+ public void send(final ServerMessage message, final boolean direct) throws Exception
{
long id = storageManager.generateUniqueID();
@@ -1016,11 +998,11 @@
{
// It's a management message
- handleManagementMessage(message);
+ handleManagementMessage(message, direct);
}
else
{
- doSend(message);
+ doSend(message, direct);
}
if (defaultAddress == null)
@@ -1045,7 +1027,7 @@
{
currentLargeMessage.releaseResources();
- doSend(currentLargeMessage);
+ doSend(currentLargeMessage, false);
currentLargeMessage = null;
}
@@ -1112,7 +1094,7 @@
started = s;
}
- private void handleManagementMessage(final ServerMessage message) throws Exception
+ private void handleManagementMessage(final ServerMessage message, final boolean
direct) throws Exception
{
try
{
@@ -1135,7 +1117,7 @@
{
reply.setAddress(replyTo);
- doSend(reply);
+ doSend(reply, direct);
}
}
@@ -1171,7 +1153,7 @@
}
}
- private void doSend(final ServerMessage msg) throws Exception
+ private void doSend(final ServerMessage msg, final boolean direct) throws Exception
{
// check the user has write access to this address.
try
@@ -1200,7 +1182,7 @@
routingContext.setTransaction(tx);
}
- postOffice.route(msg, routingContext);
+ postOffice.route(msg, routingContext, direct);
routingContext.clear();
}
Modified:
trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2010-05-17
17:39:52 UTC (rev 9241)
+++
trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2010-05-18
12:00:15 UTC (rev 9242)
@@ -720,7 +720,7 @@
new
SimpleString(notification.getUID()));
}
- postOffice.route(notificationMessage);
+ postOffice.route(notificationMessage, false);
}
}
}
Modified: trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java 2010-05-17
17:39:52 UTC (rev 9241)
+++ trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java 2010-05-18
12:00:15 UTC (rev 9242)
@@ -150,7 +150,7 @@
MessageReference ref = message.createReference(queue);
- queue.addLast(ref);
+ queue.addLast(ref, false);
refs.add(ref);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-05-17
17:39:52 UTC (rev 9241)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-05-18
12:00:15 UTC (rev 9242)
@@ -524,4 +524,10 @@
return null;
}
+ public void addLast(MessageReference ref, boolean direct)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
\ No newline at end of file
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2010-05-17
17:39:52 UTC (rev 9241)
+++
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2010-05-18
12:00:15 UTC (rev 9242)
@@ -47,7 +47,7 @@
public void testGetID() throws Exception
{
Channel channel = new SimpleChannel(RandomUtil.randomInt());
- NettyConnection conn = new NettyConnection(channel, new MyListener(), false);
+ NettyConnection conn = new NettyConnection(channel, new MyListener(), false,
false);
Assert.assertEquals(channel.getId().intValue(), conn.getID());
}
@@ -59,7 +59,7 @@
Assert.assertEquals(0, channel.getWritten().size());
- NettyConnection conn = new NettyConnection(channel, new MyListener(), false);
+ NettyConnection conn = new NettyConnection(channel, new MyListener(), false,
false);
conn.write(buff);
Assert.assertEquals(1, channel.getWritten().size());
@@ -68,7 +68,7 @@
public void testCreateBuffer() throws Exception
{
Channel channel = new SimpleChannel(RandomUtil.randomInt());
- NettyConnection conn = new NettyConnection(channel, new MyListener(), false);
+ NettyConnection conn = new NettyConnection(channel, new MyListener(), false,
false);
final int size = 1234;
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2010-05-17
17:39:52 UTC (rev 9241)
+++
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2010-05-18
12:00:15 UTC (rev 9242)
@@ -178,4 +178,22 @@
}
+ public void route(ServerMessage message, boolean direct) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void route(ServerMessage message, RoutingContext context, boolean direct)
throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void route(ServerMessage message, Transaction tx, boolean direct) throws
Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
\ No newline at end of file