JBoss hornetq SVN: r9244 - in trunk: src/main/org/hornetq/core/protocol/core and 4 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-05-18 12:30:21 -0400 (Tue, 18 May 2010)
New Revision: 9244
Modified:
trunk/docs/user-manual/en/paging.xml
trunk/docs/user-manual/en/thread-pooling.xml
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/server/ServerConsumer.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
trunk/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient.java
Log:
https://jira.jboss.org/browse/HORNETQ-390 and some tweaks
Modified: trunk/docs/user-manual/en/paging.xml
===================================================================
--- trunk/docs/user-manual/en/paging.xml 2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/docs/user-manual/en/paging.xml 2010-05-18 16:30:21 UTC (rev 9244)
@@ -155,6 +155,7 @@
able to continue sending.</para>
<para>To do this just set the <literal>address-full-policy</literal> to <literal
>BLOCK</literal> in the address settings</para>
+ <para>In the default configuration, all addresses are configured to block producers after 10 MiB of data are in the address.</para>
</section>
<section>
<title>Caution with Addresses with Multiple Queues</title>
Modified: trunk/docs/user-manual/en/thread-pooling.xml
===================================================================
--- trunk/docs/user-manual/en/thread-pooling.xml 2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/docs/user-manual/en/thread-pooling.xml 2010-05-18 16:30:21 UTC (rev 9244)
@@ -28,14 +28,18 @@
thread pool for scheduled use. A Java scheduled thread pool cannot be configured to use
a standard thread pool, otherwise we could use a single thread pool for both scheduled
and non scheduled activity.</para>
- <para>When using old (blocking) IO, a separate thread pool is also used to service connections. Since old IO requires a thread per connection
- it does not make sense to get them from the standard pool as the pool will easily get exhausted if too many connections are made, resulting in the
- server "hanging" since it has no remaining threads to do anything else.</para>
- <para>When using new IO (NIO), HornetQ will, by default, use a number of threads equal to three times the number of cores (or hyper-threads)
- as reported by Runtime.getRuntime().availableProcessors() for processing incoming packets.
- If you want to override this value, you can set the number of threads by specifying the parameter <literal>nio-remoting-threads</literal>
- in the transport configuration. See the
- <xref linkend="configuring-transports"/> for more information on this.</para>
+ <para>When using old (blocking) IO, a separate thread pool is also used to service
+ connections. Since old IO requires a thread per connection it does not make sense to get
+ them from the standard pool as the pool will easily get exhausted if too many
+ connections are made, resulting in the server "hanging" since it has no remaining
+ threads to do anything else. If you require the server to handle many concurrent
+ connections you should make sure you use NIO, not old IO.</para>
+ <para>When using new IO (NIO), HornetQ will, by default, use a number of threads equal to
+ three times the number of cores (or hyper-threads) as reported by
+ Runtime.getRuntime().availableProcessors() for processing incoming packets. If you want
+ to override this value, you can set the number of threads by specifying the parameter
+ <literal>nio-remoting-threads</literal> in the transport configuration. See the
+ <xref linkend="configuring-transports"/> for more information on this.</para>
<para>There are also a small number of other places where threads are used directly, we'll
discuss each in turn.</para>
<section id="server.scheduled.thread.pool">
@@ -111,8 +115,9 @@
myFactory.setUseGlobalPools(false);
myFactory.setScheduledThreadPoolMaxSize(10);
myFactory.setThreadPoolMaxSize(-1); </programlisting>
- <para>If you're using the JMS API, you can set the same parameters on the ClientSessionFactory and use it to create the <literal
- >ConnectionFactory</literal> instance, for example:</para>
+ <para>If you're using the JMS API, you can set the same parameters on the
+ ClientSessionFactory and use it to create the <literal>ConnectionFactory</literal>
+ instance, for example:</para>
<programlisting>ConnectionFactory myConnectionFactory = HornetQJMSClient.createConnectionFactory(myFactory); </programlisting>
<para>If you're using JNDI to instantiate <literal>HornetQConnectionFactory</literal>
instances, you can also set these parameters in the <literal>hornetq-jms.xml</literal>
Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-05-18 16:30:21 UTC (rev 9244)
@@ -165,7 +165,7 @@
try
{
- session.close();
+ session.close(true);
}
catch (Exception e)
{
@@ -181,7 +181,7 @@
try
{
- session.close();
+ session.close(false);
}
catch (Exception e)
{
@@ -422,7 +422,7 @@
case SESS_CLOSE:
{
requiresResponse = true;
- session.close();
+ session.close(false);
removeConnectionListeners();
response = new NullResponseMessage();
flush = true;
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-05-18 16:30:21 UTC (rev 9244)
@@ -54,7 +54,7 @@
// TODO use same value than HornetQConnection
private static final String CONNECTION_ID_PROP = "__HQ_CID";
-
+
// Attributes ----------------------------------------------------
private final HornetQServer server;
@@ -108,7 +108,7 @@
this.frameDecoder = new StompFrameDecoder();
this.executor = server.getExecutorFactory().getExecutor();
}
-
+
// ProtocolManager implementation --------------------------------
public ConnectionEntry createConnectionEntry(final Connection connection)
@@ -131,26 +131,25 @@
if (frame == null)
{
return -1;
- }
+ }
else
{
return buffer.readerIndex() - start;
}
}
-
public void handleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
{
try
{
doHandleBuffer(connection, buffer);
- }
+ }
finally
{
server.getStorageManager().clearContext();
}
}
-
+
private void doHandleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
{
StompConnection conn = (StompConnection)connection;
@@ -162,7 +161,7 @@
{
log.trace("received " + request);
}
-
+
String command = request.getCommand();
StompFrame response = null;
@@ -269,6 +268,7 @@
}
}
}
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -290,11 +290,12 @@
}
if (noLocal)
{
- String noLocalFilter = CONNECTION_ID_PROP + " <> '" + connection.getID().toString() + "'";
+ String noLocalFilter = CONNECTION_ID_PROP + " <> '" + connection.getID().toString() + "'";
if (selector == null)
{
- selector = noLocalFilter;
- } else
+ selector = noLocalFilter;
+ }
+ else
{
selector += " AND " + noLocalFilter;
}
@@ -325,7 +326,13 @@
}
long consumerID = server.getStorageManager().generateUniqueID();
String clientID = (connection.getClientID() != null) ? connection.getClientID() : null;
- stompSession.addSubscription(consumerID, subscriptionID, clientID, durableSubscriptionName, destination, selector, ack);
+ stompSession.addSubscription(consumerID,
+ subscriptionID,
+ clientID,
+ durableSubscriptionName,
+ destination,
+ selector,
+ ack);
return null;
}
@@ -423,7 +430,7 @@
}
StompSession session = getTransactedSession(connection, txID);
-
+
if (session == null)
{
throw new StompException("No transaction started: " + txID);
@@ -567,15 +574,16 @@
{
connection.setValid(false);
- try {
+ try
+ {
StompSession session = sessions.remove(connection.getID());
if (session != null)
{
try
{
session.getSession().rollback(true);
- session.getSession().close();
- }
+ session.getSession().close(false);
+ }
catch (Exception e)
{
log.warn(e.getMessage(), e);
@@ -593,7 +601,7 @@
try
{
serverSession.rollback(true);
- serverSession.close();
+ serverSession.close(false);
}
catch (Exception e)
{
Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2010-05-18 16:30:21 UTC (rev 9244)
@@ -28,9 +28,9 @@
{
long getID();
- void close() throws Exception;
+ void close(boolean failed) throws Exception;
- List<MessageReference> cancelRefs(boolean lastConsumedAsDelivered, Transaction tx) throws Exception;
+ List<MessageReference> cancelRefs(boolean failed, boolean lastConsumedAsDelivered, Transaction tx) throws Exception;
void setStarted(boolean started);
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-05-18 16:30:21 UTC (rev 9244)
@@ -107,7 +107,7 @@
void requestProducerCredits(SimpleString address, int credits) throws Exception;
- void close() throws Exception;
+ void close(boolean failed) throws Exception;
void setTransferring(boolean transferring);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-05-18 16:30:21 UTC (rev 9244)
@@ -262,7 +262,7 @@
return filter;
}
- public void close() throws Exception
+ public void close(final boolean failed) throws Exception
{
setStarted(false);
@@ -278,7 +278,7 @@
session.removeConsumer(id);
- LinkedList<MessageReference> refs = cancelRefs(false, null);
+ LinkedList<MessageReference> refs = cancelRefs(failed, false, null);
Iterator<MessageReference> iter = refs.iterator();
@@ -356,7 +356,7 @@
}
}
- public LinkedList<MessageReference> cancelRefs(final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
+ public LinkedList<MessageReference> cancelRefs(final boolean failed, final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
{
boolean performACK = lastConsumedAsDelivered;
@@ -374,7 +374,13 @@
}
else
{
- ref.decrementDeliveryCount();
+ if (!failed)
+ {
+ //We don't decrement delivery count if the client failed, since there's a possibility that refs were actually delivered but we just didn't get any acks for them
+ //before failure
+ log.info("decrementing delivery count");
+ ref.decrementDeliveryCount();
+ }
refs.add(ref);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-05-18 16:30:21 UTC (rev 9244)
@@ -241,20 +241,20 @@
}
}
- private synchronized void doClose() throws Exception
+ private synchronized void doClose(final boolean failed) throws Exception
{
if (tx != null && tx.getXid() == null)
{
// We only rollback local txs on close, not XA tx branches
- rollback(false);
+ rollback(failed);
}
Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
for (ServerConsumer consumer : consumersClone)
{
- consumer.close();
+ consumer.close(failed);
}
consumers.clear();
@@ -905,7 +905,7 @@
setStarted(false);
}
- public void close()
+ public void close(final boolean failed)
{
storageManager.afterCompleteOperations(new IOAsyncTask()
{
@@ -917,7 +917,7 @@
{
try
{
- doClose();
+ doClose(failed);
}
catch (Exception e)
{
@@ -933,7 +933,7 @@
if (consumer != null)
{
- consumer.close();
+ consumer.close(false);
}
else
{
@@ -1065,7 +1065,7 @@
{
ServerSessionImpl.log.warn("Client connection failed, clearing up resources for session " + name);
- close();
+ close(true);
ServerSessionImpl.log.warn("Cleared up resources for session " + name);
}
@@ -1134,7 +1134,7 @@
consumer.setStarted(false);
}
- toCancel.addAll(consumer.cancelRefs(lastMessageAsDelived, theTx));
+ toCancel.addAll(consumer.cancelRefs(false, lastMessageAsDelived, theTx));
}
for (MessageReference ref : toCancel)
Modified: trunk/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java 2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java 2010-05-18 16:30:21 UTC (rev 9244)
@@ -61,8 +61,7 @@
{
assertActiveConnections(0);
- // spawn a JVM that creates a Core client, which waits to receive a test
- // message
+ // spawn a JVM that creates a Core client, which sends a message
Process p = SpawnedVMSupport.spawnVM(CrashClient.class.getName());
ClientSession session = sf.createSession(false, true, true);
@@ -72,7 +71,7 @@
session.start();
- // send the message to the queue
+ // receive a message from the queue
Message messageFromClient = consumer.receive(5000);
Assert.assertNotNull("no message received", messageFromClient);
Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBodyBuffer().readString());
@@ -110,7 +109,42 @@
// FIXME https://jira.jboss.org/jira/browse/JBMESSAGING-1421
assertActiveSession(0);
}
+
+ public void testCrashClient2() throws Exception
+ {
+ assertActiveConnections(0);
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ClientCrashTest.QUEUE, ClientCrashTest.QUEUE, null, false);
+
+ // spawn a JVM that creates a Core client, which sends a message
+ Process p = SpawnedVMSupport.spawnVM(CrashClient2.class.getName());
+
+ ClientCrashTest.log.debug("waiting for the client VM to crash ...");
+ p.waitFor();
+
+ Assert.assertEquals(9, p.exitValue());
+
+ System.out.println("VM Exited");
+
+ Thread.sleep(3 * ClientCrashTest.CONNECTION_TTL);
+
+ ClientConsumer consumer = session.createConsumer(ClientCrashTest.QUEUE);
+
+ session.start();
+
+ // receive a message from the queue
+ ClientMessage messageFromClient = consumer.receive(10000);
+ Assert.assertNotNull("no message received", messageFromClient);
+ Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBodyBuffer().readString());
+
+ assertEquals(2, messageFromClient.getDeliveryCount());
+
+ session.close();
+
+ }
+
// Package protected ---------------------------------------------
@Override
Modified: trunk/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient.java 2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient.java 2010-05-18 16:30:21 UTC (rev 9244)
@@ -23,8 +23,6 @@
/**
* Code to be run in an external VM, via main().
*
- * This client will open a connection, receive a message and crash.
- *
* @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*
14 years, 7 months
JBoss hornetq SVN: r9243 - trunk/native/bin.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-05-18 09:38:01 -0400 (Tue, 18 May 2010)
New Revision: 9243
Modified:
trunk/native/bin/libHornetQAIO32.so
Log:
32 bits recompilation
Modified: trunk/native/bin/libHornetQAIO32.so
===================================================================
(Binary files differ)
14 years, 7 months
JBoss hornetq SVN: r9242 - in trunk: src/config/jboss-as-4/clustered and 24 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-05-18 08:00:15 -0400 (Tue, 18 May 2010)
New Revision: 9242
Modified:
trunk/docs/user-manual/en/configuring-transports.xml
trunk/docs/user-manual/en/flow-control.xml
trunk/docs/user-manual/en/ha.xml
trunk/docs/user-manual/en/perf-tuning.xml
trunk/docs/user-manual/en/thread-pooling.xml
trunk/src/config/jboss-as-4/clustered/hornetq-configuration.xml
trunk/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml
trunk/src/config/jboss-as-5/clustered/hornetq-configuration.xml
trunk/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml
trunk/src/config/jboss-as-6/clustered/hornetq-configuration.xml
trunk/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml
trunk/src/config/stand-alone/clustered/hornetq-configuration.xml
trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml
trunk/src/config/trunk/clustered/hornetq-configuration.xml
trunk/src/config/trunk/non-clustered/hornetq-configuration.xml
trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/postoffice/PostOffice.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
added direct delivery plus some docs tweaks
Modified: trunk/docs/user-manual/en/configuring-transports.xml
===================================================================
--- trunk/docs/user-manual/en/configuring-transports.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/docs/user-manual/en/configuring-transports.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -188,20 +188,23 @@
with firewall policies that typically only allow connections to be initiated in one
direction.</para>
<para>All the valid Netty transport keys are defined in the class <literal
- >org.hornetq.core.remoting.impl.netty.TransportConstants</literal>. The
- parameters can be used either with acceptors or connectors. The following parameters
- can be used to configure Netty for simple TCP:</para>
+ >org.hornetq.core.remoting.impl.netty.TransportConstants</literal>. Most
+ parameters can be used either with acceptors or connectors, some only work with
+ acceptors. The following parameters can be used to configure Netty for simple
+ TCP:</para>
<itemizedlist>
<listitem>
<para><literal>use-nio</literal>. If this is <literal>true</literal> then Java
non blocking NIO will be used. If set to <literal>false</literal> than old
blocking Java IO will be used.</para>
- <para>We highly recommend that you use non blocking Java NIO. Java NIO does not
- maintain a thread per connection so can scale to many more concurrent
- connections than with old blocking IO. We recommend the usage of Java 6 for
- NIO and the best scalability. The default value for this property is
- <literal>true</literal> on the server side and <literal>false</literal>
- on the client side.</para>
+ <para>If you require the server to handle many concurrent connections, we highly
+ recommend that you use non blocking Java NIO. Java NIO does not maintain a
+ thread per connection so can scale to many more concurrent connections than
+ with old blocking IO. If you don't require the server to handle many
+ concurrent connections, you might get slightly better performance by using
+ old (blocking) IO. The default value for this property is <literal
+ >false</literal> on the server side and <literal>false</literal> on the
+ client side.</para>
</listitem>
<listitem>
<para><literal>host</literal>. This specifies the host name or IP address to
@@ -233,8 +236,8 @@
is <literal>true</literal>.</para>
</listitem>
<listitem>
- <para><literal>tcp-send-buffer-size</literal>. This parameter determines the size
- of the TCP send buffer in bytes. The default value for this property is
+ <para><literal>tcp-send-buffer-size</literal>. This parameter determines the
+ size of the TCP send buffer in bytes. The default value for this property is
<literal>32768</literal> bytes (32KiB).</para>
<para>TCP buffer sizes should be tuned according to the bandwidth and latency of
your network. Here's a good link that explains the theory behind <ulink
@@ -254,6 +257,37 @@
size of the TCP receive buffer in bytes. The default value for this property
is <literal>32768</literal> bytes (32KiB).</para>
</listitem>
+ <listitem>
+ <para><literal>batch-delay</literal>. Before writing packets to the transport,
+ HornetQ can be configured to batch up writes for a maximum of <literal
+ >batch-delay</literal> milliseconds. This can increase overall
+ throughput for very small messages. It does so at the expense of an increase
+ in average latency for message transfer. The default value for this property
+ is <literal>0</literal> ms.</para>
+ </listitem>
+ <listitem>
+ <para><literal>direct-deliver</literal>. When a message arrives on the server
+ and is delivered to waiting consumers, by default, the delivery is done on a
+ different thread to that which the message arrived on. This gives the best
+ overall throughput and scalability, especially on multi-core machines.
+ However it also introduces some extra latency due to the extra context
+ switch required. If you want the lowest latency and the possible expense of
+ some reduction in throughput then you can make sure <literal
+ >direct-deliver</literal> to true. The default value for this parameter
+ is <literal>true</literal>. If you are willingh to take some small extra hit
+ on latency but want the highest throughput set this parameter to <literal
+ >false</literal>.</para>
+ </listitem>
+ <listitem>
+ <para><literal>nio-remoting-threads</literal>. When configured to use NIO,
+ HornetQ will, by default, use a number of threads equal to three times the
+ number of cores (or hyper-threads) as reported by <literal
+ >Runtime.getRuntime().availableProcessors()</literal> for processing
+ incoming packets. If you want to override this value, you can set the number
+ of threads by specifying this parameter. The default value for this
+ parameter is <literal>-1</literal> which means use the value from <literal
+ >Runtime.getRuntime().availableProcessors()</literal> * 3.</para>
+ </listitem>
</itemizedlist>
</section>
<section>
@@ -269,20 +303,20 @@
SSL.</para>
</listitem>
<listitem>
- <para><literal>key-store-path</literal>. This is the path to the SSL key store on
- the client which holds the client certificates.</para>
+ <para><literal>key-store-path</literal>. This is the path to the SSL key store
+ on the client which holds the client certificates.</para>
</listitem>
<listitem>
<para><literal>key-store-password</literal>. This is the password for the client
certificate key store on the client.</para>
</listitem>
<listitem>
- <para><literal>trust-store-path</literal>. This is the path to the trusted client
- certificate store on the server.</para>
+ <para><literal>trust-store-path</literal>. This is the path to the trusted
+ client certificate store on the server.</para>
</listitem>
<listitem>
- <para><literal>trust-store-password</literal>. This is the password to the trusted
- client certificate store on the server.</para>
+ <para><literal>trust-store-password</literal>. This is the password to the
+ trusted client certificate store on the server.</para>
</listitem>
</itemizedlist>
</section>
@@ -303,8 +337,8 @@
before sending an empty http request to keep the connection alive</para>
</listitem>
<listitem>
- <para><literal>http-client-idle-scan-period</literal>. How often, in milliseconds,
- to scan for idle clients</para>
+ <para><literal>http-client-idle-scan-period</literal>. How often, in
+ milliseconds, to scan for idle clients</para>
</listitem>
<listitem>
<para><literal>http-response-time</literal>. How long the server can wait before
Modified: trunk/docs/user-manual/en/flow-control.xml
===================================================================
--- trunk/docs/user-manual/en/flow-control.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/docs/user-manual/en/flow-control.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -261,10 +261,12 @@
100000 bytes and would block any producers sending to that address to prevent that
max size being exceeded.</para>
<para>Note the policy must be set to <literal>BLOCK</literal> to enable blocking producer
- flow control.</para>
- <para>Please note the default value for <literal>address-full-policy</literal> is to
- <literal>PAGE</literal> (see <xref linkend="paging" /> for more information on
- paging).</para>
+ flow control.</para>
+ <note><para>Note that in the default configuration all addresses are set to block producers after 10 MiB of message data
+ is in the address. This means you cannot send more than 10MiB of message data to an address without it being consumed before the producers
+ will be blocked. If you do not want this behaviour increase the <literal>max-size-bytes</literal> parameter or change the
+ address full message policy.</para>
+ </note>
</section>
</section>
<section>
Modified: trunk/docs/user-manual/en/ha.xml
===================================================================
--- trunk/docs/user-manual/en/ha.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/docs/user-manual/en/ha.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -135,7 +135,9 @@
<section id="ha.mode.shared">
<title>Shared Store</title>
<para>When using a shared store, both live and backup servers share the
- <emphasis>same</emphasis> journal using a shared file system. </para>
+ <emphasis>same</emphasis> entire data directory using a shared file system.
+ This means the paging directory, journal directory, large messages and binding
+ journal.</para>
<para>When failover occurs and the backup server takes over, it will load the
persistent storage from the shared file system and clients can connect to
it.</para>
Modified: trunk/docs/user-manual/en/perf-tuning.xml
===================================================================
--- trunk/docs/user-manual/en/perf-tuning.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/docs/user-manual/en/perf-tuning.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -99,6 +99,11 @@
really need durable messages then set them to be non-durable. Durable messages
incur a lot more overhead in persisting them to storage.</para>
</listitem>
+ <listitem>
+ <para>Batch many sends or acknowledgements in a single transaction. HornetQ will
+ only require a network round trip on the commit, not on every send or
+ acknowledgement.</para>
+ </listitem>
</itemizedlist>
</section>
<section>
@@ -154,17 +159,18 @@
for more information.</para>
</listitem>
<listitem>
- <para>If you have very fast consumers, you can increase consumer-window-size. This effectively disables consumer flow control.</para>
+ <para>If you have very fast consumers, you can increase consumer-window-size. This
+ effectively disables consumer flow control.</para>
</listitem>
<listitem>
- <para>Socket NIO vs Socket Old IO. By default HornetQ uses Socket NIO on the server
- and old (blocking) IO on the client side (see the chapter on configuring
- transports for more information <xref linkend="configuring-transports"/>). NIO
- is much more scalable but can give you some latency hit compared to old blocking
- IO. If you expect to be able to service many thousands of connections on the
- server, then continue to use NIO on the server. However, if don't expect many
- thousands of connections on the server you can configure the server acceptors to
- use old IO, and might get a small performance advantage.</para>
+ <para>Socket NIO vs Socket Old IO. By default HornetQ uses old (blocking) on the
+ server and the client side (see the chapter on configuring transports for more
+ information <xref linkend="configuring-transports"/>). NIO is much more scalable
+ but can give you some latency hit compared to old blocking IO. If you need to be
+ able to service many thousands of connections on the server, then you should
+ make sure you're using NIO on the server. However, if don't expect many
+ thousands of connections on the server you can keep the server acceptors using
+ old IO, and might get a small performance advantage.</para>
</listitem>
<listitem>
<para>Use the core API not JMS. Using the JMS API you will have slightly lower
@@ -180,7 +186,7 @@
</section>
<section>
<title>Tuning Transport Settings</title>
- <itemizedlist>
+ <itemizedlist>
<listitem>
<para>TCP buffer sizes. If you have a fast network and fast machines you may get a
performance boost by increasing the TCP send and receive buffer sizes. See the
@@ -202,13 +208,23 @@
This would allow up to 20000 file handles to be open by the user <literal
>serveruser</literal>. </para>
</listitem>
+ <listitem>
+ <para>Use <literal>batch-delay</literal> and set <literal>direct-deliver</literal>
+ to false for the best throughput for very small messages. HornetQ comes with a
+ preconfigured connector/acceptor pair (<literal>netty-throughput</literal>) in
+ <literal>hornetq-configuration.xml</literal> and JMS connection factory
+ (<literal>ThroughputConnectionFactory</literal>) in <literal
+ >hornetq-jms.xml</literal>which can be used to give the very best
+ throughput, especially for small messages. See the <xref
+ linkend="configuring-transports"/> for more information on this.</para>
+ </listitem>
</itemizedlist>
</section>
<section>
<title>Tuning the VM</title>
- <para>We highly recommend you use the latest Java JVM for the best performance. We test internally using the
- Sun JVM, so some of these tunings won't apply to JDKs from other providers (e.g. IBM or
- JRockit)</para>
+ <para>We highly recommend you use the latest Java JVM for the best performance. We test
+ internally using the Sun JVM, so some of these tunings won't apply to JDKs from other
+ providers (e.g. IBM or JRockit)</para>
<itemizedlist>
<listitem>
<para>Garbage collection. For smooth server operation we recommend using a parallel
@@ -223,15 +239,6 @@
size and number of your messages. Use the JVM arguments <literal>-Xms</literal>
and <literal>-Xmx</literal> to set server available RAM. We recommend setting
them to the same high value.</para>
- <para>HornetQ will regularly sample JVM memory and reports if the available memory
- is below a configurable threshold. Use this information to properly set JVM
- memory and paging. The sample is disabled by default. To enabled it, configure
- the sample frequency by setting <literal>memory-measure-interval</literal> in
- <literal>hornetq-configuration.xml</literal> (in milliseconds). When the
- available memory goes below the configured threshold, a warning is logged. The
- threshold can be also configured by setting <literal
- >memory-warning-threshold</literal> in <literal
- >hornetq-configuration.xml</literal> (default is 25%).</para>
</listitem>
<listitem>
<para>Aggressive options. Different JVMs provide different sets of JVM tuning
@@ -257,7 +264,10 @@
<note>
<para>Some popular libraries such as the Spring JMS Template are known to use
these anti-patterns. If you're using Spring JMS Template and you're getting
- poor performance you know why. Don't blame HornetQ!</para>
+ poor performance you know why. Don't blame HornetQ! The Spring JMS Template
+ can only safely be used in an app server which caches JMS sessions (e.g.
+ using JCA), and only then for sending messages. It cannot be safely be used
+ for synchronously consuming messages, even in an app server. </para>
</note>
</listitem>
<listitem>
Modified: trunk/docs/user-manual/en/thread-pooling.xml
===================================================================
--- trunk/docs/user-manual/en/thread-pooling.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/docs/user-manual/en/thread-pooling.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -28,6 +28,14 @@
thread pool for scheduled use. A Java scheduled thread pool cannot be configured to use
a standard thread pool, otherwise we could use a single thread pool for both scheduled
and non scheduled activity.</para>
+ <para>When using old (blocking) IO, a separate thread pool is also used to service connections. Since old IO requires a thread per connection
+ it does not make sense to get them from the standard pool as the pool will easily get exhausted if too many connections are made, resulting in the
+ server "hanging" since it has no remaining threads to do anything else.</para>
+ <para>When using new IO (NIO), HornetQ will, by default, use a number of threads equal to three times the number of cores (or hyper-threads)
+ as reported by Runtime.getRuntime().availableProcessors() for processing incoming packets.
+ If you want to override this value, you can set the number of threads by specifying the parameter <literal>nio-remoting-threads</literal>
+ in the transport configuration. See the
+ <xref linkend="configuring-transports"/> for more information on this.</para>
<para>There are also a small number of other places where threads are used directly, we'll
discuss each in turn.</para>
<section id="server.scheduled.thread.pool">
@@ -60,7 +68,7 @@
bounded thread pool is used with caution since it can lead to dead-lock situations
if the upper bound is chosen to be too low.</para>
<para>The default value for <literal>thread-pool-max-size</literal> is <literal
- >-1</literal>, i.e. the thread pool is unbounded.</para>
+ >30</literal>.</para>
<para>See the <ulink
url="http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolEx..."
>J2SE javadoc</ulink> for more information on unbounded (cached), and bounded
Modified: trunk/src/config/jboss-as-4/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-4/clustered/hornetq-configuration.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/jboss-as-4/clustered/hornetq-configuration.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -40,7 +40,7 @@
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${jboss.bind.address:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
- <param key="batch-delay" value="50"/>
+ <param key="batch-delay" value="50"/>
</connector>
<connector name="in-vm">
@@ -62,6 +62,7 @@
<param key="host" value="${jboss.bind.address:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
<acceptor name="in-vm">
Modified: trunk/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -60,6 +60,7 @@
<param key="host" value="${jboss.bind.address:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
<acceptor name="in-vm">
Modified: trunk/src/config/jboss-as-5/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-5/clustered/hornetq-configuration.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/jboss-as-5/clustered/hornetq-configuration.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -62,6 +62,7 @@
<param key="host" value="${jboss.bind.address:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
<acceptor name="in-vm">
Modified: trunk/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -60,6 +60,7 @@
<param key="host" value="${jboss.bind.address:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
<acceptor name="in-vm">
Modified: trunk/src/config/jboss-as-6/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-6/clustered/hornetq-configuration.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/jboss-as-6/clustered/hornetq-configuration.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -62,6 +62,7 @@
<param key="host" value="${jboss.bind.address:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
<acceptor name="in-vm">
Modified: trunk/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -60,6 +60,7 @@
<param key="host" value="${jboss.bind.address:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
<acceptor name="in-vm">
Modified: trunk/src/config/stand-alone/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/clustered/hornetq-configuration.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/stand-alone/clustered/hornetq-configuration.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -41,6 +41,7 @@
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
</acceptors>
Modified: trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -39,6 +39,7 @@
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
</acceptors>
Modified: trunk/src/config/trunk/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/trunk/clustered/hornetq-configuration.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/trunk/clustered/hornetq-configuration.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -33,6 +33,7 @@
<param key="host" value="${jboss.bind.address:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
</acceptors>
Modified: trunk/src/config/trunk/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/trunk/non-clustered/hornetq-configuration.xml 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/config/trunk/non-clustered/hornetq-configuration.xml 2010-05-18 12:00:15 UTC (rev 9242)
@@ -31,6 +31,7 @@
<param key="host" value="${jboss.bind.address:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5455}"/>
<param key="batch-delay" value="50"/>
+ <param key="direct-deliver" value="false"/>
</acceptor>
</acceptors>
Modified: trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -838,6 +838,7 @@
for (int i = 0; i < paramsNodes.getLength(); i++)
{
Node paramNode = paramsNodes.item(i);
+
NamedNodeMap attributes = paramNode.getAttributes();
Node nkey = attributes.getNamedItem("key");
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -985,7 +985,7 @@
}
- postOffice.route(message, depageTransaction);
+ postOffice.route(message, depageTransaction, false);
// This means the page is duplicated. So we need to ignore this
if (depageTransaction.getState() == State.ROLLBACK_ONLY)
Modified: trunk/src/main/org/hornetq/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/PostOffice.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/postoffice/PostOffice.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -49,11 +49,11 @@
Bindings getMatchingBindings(SimpleString address);
- void route(ServerMessage message) throws Exception;
+ void route(ServerMessage message, boolean direct) throws Exception;
- void route(ServerMessage message, Transaction tx) throws Exception;
+ void route(ServerMessage message, Transaction tx, boolean direct) throws Exception;
- void route(ServerMessage message, RoutingContext context) throws Exception;
+ void route(ServerMessage message, RoutingContext context, boolean direct) throws Exception;
MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception;
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -64,7 +64,6 @@
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
-import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUIDGenerator;
@@ -529,17 +528,17 @@
return addressManager.getMatchingBindings(address);
}
- public void route(final ServerMessage message) throws Exception
+ public void route(final ServerMessage message, final boolean direct) throws Exception
{
- route(message, (Transaction)null);
+ route(message, (Transaction)null, direct);
}
- public void route(final ServerMessage message, final Transaction tx) throws Exception
+ public void route(final ServerMessage message, final Transaction tx, final boolean direct) throws Exception
{
- this.route(message, new RoutingContextImpl(tx));
+ route(message, new RoutingContextImpl(tx), direct);
}
- public void route(final ServerMessage message, final RoutingContext context) throws Exception
+ public void route(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception
{
// Sanity check
if (message.getRefCount() > 0)
@@ -656,13 +655,13 @@
message.setAddress(dlaAddress);
- route(message, context.getTransaction());
+ route(message, context.getTransaction(), false);
}
}
}
else
{
- processRoute(message, context);
+ processRoute(message, context, direct);
}
if (startedTx)
@@ -689,7 +688,7 @@
if (tx == null)
{
- queue.addLast(reference);
+ queue.addLast(reference, false);
}
else
{
@@ -717,7 +716,7 @@
if (routed)
{
- processRoute(message, context);
+ processRoute(message, context, false);
res = true;
}
@@ -778,7 +777,7 @@
message.setAddress(queueName);
message.putBooleanProperty(PostOfficeImpl.HDR_RESET_QUEUE_DATA, true);
- routeDirect(message, queue, false);
+ routeQueueInfo(message, queue, false);
for (QueueInfo info : queueInfos.values())
{
@@ -793,7 +792,7 @@
message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, info.getFilterString());
message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
- routeDirect(message, queue, true);
+ routeQueueInfo(message, queue, true);
int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
@@ -806,7 +805,7 @@
message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
- routeDirect(message, queue, true);
+ routeQueueInfo(message, queue, true);
}
if (info.getFilterStrings() != null)
@@ -821,7 +820,7 @@
message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
- routeDirect(message, queue, true);
+ routeQueueInfo(message, queue, true);
}
}
}
@@ -838,7 +837,7 @@
message.setPagingStore(store);
}
- private void routeDirect(final ServerMessage message, final Queue queue, final boolean applyFilters) throws Exception
+ private void routeQueueInfo(final ServerMessage message, final Queue queue, final boolean applyFilters) throws Exception
{
if (!applyFilters || queue.getFilter() == null || queue.getFilter().match(message))
{
@@ -846,11 +845,11 @@
queue.route(message, context);
- processRoute(message, context);
+ processRoute(message, context, false);
}
}
- private void processRoute(final ServerMessage message, final RoutingContext context) throws Exception
+ private void processRoute(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception
{
final List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -951,7 +950,7 @@
public void done()
{
- addReferences(refs);
+ addReferences(refs, direct);
}
});
}
@@ -960,11 +959,11 @@
/**
* @param refs
*/
- private void addReferences(final List<MessageReference> refs)
+ private void addReferences(final List<MessageReference> refs, final boolean direct)
{
for (MessageReference ref : refs)
{
- ref.getQueue().addLast(ref);
+ ref.getQueue().addLast(ref, direct);
}
}
@@ -985,7 +984,6 @@
ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
message.setAddress(queueName);
- // message.setDurable(true);
String uid = UUIDGenerator.getInstance().generateStringUUID();
@@ -1199,7 +1197,7 @@
// This could happen when the PageStore left the pageState
// TODO is this correct - don't we lose transactionality here???
- route(message);
+ route(message, false);
}
first = false;
}
@@ -1235,7 +1233,7 @@
{
for (MessageReference ref : refs)
{
- ref.getQueue().addLast(ref);
+ ref.getQueue().addLast(ref, false);
}
}
Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -92,10 +92,12 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAStartMessage;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.remoting.impl.netty.NettyConnection;
import org.hornetq.core.server.BindingQueryResult;
import org.hornetq.core.server.QueueQueryResult;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.spi.core.remoting.Connection;
/**
* A ServerSessionPacketHandler
@@ -119,6 +121,8 @@
private final Channel channel;
private volatile CoreRemotingConnection remotingConnection;
+
+ private final boolean direct;
public ServerSessionPacketHandler(final ServerSession session,
final OperationContext sessionContext,
@@ -134,7 +138,19 @@
this.channel = channel;
this.remotingConnection = channel.getConnection();
-
+
+ //TODO think of a better way of doing this
+ Connection conn = remotingConnection.getTransportConnection();
+
+ if (conn instanceof NettyConnection)
+ {
+ direct = ((NettyConnection)conn).isDirectDeliver();
+ }
+ else
+ {
+ direct = false;
+ }
+
addConnectionListeners();
}
@@ -442,7 +458,7 @@
{
SessionSendMessage message = (SessionSendMessage)packet;
requiresResponse = message.isRequiresResponse();
- session.send((ServerMessage)message.getMessage());
+ session.send((ServerMessage)message.getMessage(), direct);
if (requiresResponse)
{
response = new NullResponseMessage();
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -535,7 +535,7 @@
{
message.putStringProperty(CONNECTION_ID_PROP, connection.getID().toString());
}
- stompSession.getSession().send(message);
+ stompSession.getSession().send(message, true);
return null;
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -140,7 +140,7 @@
private final ConcurrentMap<Object, Connection> connections = new ConcurrentHashMap<Object, Connection>();
private final Executor threadPool;
-
+
private final ScheduledExecutorService scheduledThreadPool;
private NotificationService notificationService;
@@ -148,13 +148,15 @@
private VirtualExecutorService bossExecutor;
private boolean paused;
-
+
private BatchFlusher flusher;
-
+
private ScheduledFuture<?> batchFlusherFuture;
-
+
private final long batchDelay;
+ private final boolean directDeliver;
+
public NettyAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
final BufferDecoder decoder,
@@ -253,12 +255,16 @@
configuration);
this.threadPool = threadPool;
-
+
this.scheduledThreadPool = scheduledThreadPool;
-
+
batchDelay = ConfigurationHelper.getLongProperty(TransportConstants.BATCH_DELAY,
TransportConstants.DEFAULT_BATCH_DELAY,
configuration);
+
+ directDeliver = ConfigurationHelper.getBooleanProperty(TransportConstants.DIRECT_DELIVER,
+ TransportConstants.DEFAULT_DIRECT_DELIVER,
+ configuration);
}
public synchronized void start() throws Exception
@@ -418,15 +424,25 @@
Notification notification = new Notification(null, NotificationType.ACCEPTOR_STARTED, props);
notificationService.sendNotification(notification);
}
-
+
if (batchDelay > 0)
{
flusher = new BatchFlusher();
-
- batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher, batchDelay, batchDelay, TimeUnit.MILLISECONDS);
+
+ batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher,
+ batchDelay,
+ batchDelay,
+ TimeUnit.MILLISECONDS);
}
- NettyAcceptor.log.info("Started Netty Acceptor version " + Version.ID + " " + host + ":" + port + " for " + protocol + " protocol");
+ NettyAcceptor.log.info("Started Netty Acceptor version " + Version.ID +
+ " " +
+ host +
+ ":" +
+ port +
+ " for " +
+ protocol +
+ " protocol");
}
private void startServerChannels()
@@ -454,15 +470,15 @@
{
return;
}
-
+
if (batchFlusherFuture != null)
{
batchFlusherFuture.cancel(false);
-
+
flusher.cancel();
-
+
flusher = null;
-
+
batchFlusherFuture = null;
}
@@ -589,7 +605,7 @@
@Override
public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
{
- new NettyConnection(e.getChannel(), new Listener(), !httpEnabled && batchDelay > 0);
+ new NettyConnection(e.getChannel(), new Listener(), !httpEnabled && batchDelay > 0, directDeliver);
SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
if (sslHandler != null)
@@ -650,7 +666,7 @@
}
}
-
+
private class BatchFlusher implements Runnable
{
private boolean cancelled;
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -50,6 +50,8 @@
private final boolean batchingEnabled;
+ private final boolean directDeliver;
+
private HornetQBuffer batchBuffer;
private final Object writeLock = new Object();
@@ -58,7 +60,7 @@
// Constructors --------------------------------------------------
- public NettyConnection(final Channel channel, final ConnectionLifeCycleListener listener, boolean batchingEnabled)
+ public NettyConnection(final Channel channel, final ConnectionLifeCycleListener listener, boolean batchingEnabled, boolean directDeliver)
{
this.channel = channel;
@@ -66,6 +68,8 @@
this.batchingEnabled = batchingEnabled;
+ this.directDeliver = directDeliver;
+
listener.connectionCreated(this, ProtocolType.CORE);
}
@@ -211,6 +215,11 @@
{
return channel.getRemoteAddress().toString();
}
+
+ public boolean isDirectDeliver()
+ {
+ return directDeliver;
+ }
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -470,7 +470,7 @@
ch.getPipeline().get(HornetQChannelHandler.class).active = true;
}
- NettyConnection conn = new NettyConnection(ch, new Listener(), !httpEnabled && batchDelay > 0);
+ NettyConnection conn = new NettyConnection(ch, new Listener(), !httpEnabled && batchDelay > 0, false);
return conn;
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -72,6 +72,8 @@
public static final String BATCH_DELAY = "batch-delay";
+ public static final String DIRECT_DELIVER = "direct-deliver";
+
public static final boolean DEFAULT_SSL_ENABLED = false;
public static final boolean DEFAULT_USE_NIO_SERVER = false;
@@ -120,6 +122,8 @@
public static final String DEFAULT_SERVLET_PATH = "/messaging/HornetQServlet";
public static final long DEFAULT_BATCH_DELAY = 0;
+
+ public static final boolean DEFAULT_DIRECT_DELIVER = true;
public static final Set<String> ALLOWABLE_CONNECTOR_KEYS;
@@ -146,6 +150,7 @@
allowableAcceptorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME);
allowableAcceptorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY);
+ allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER);
ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -48,9 +48,11 @@
void removeConsumer(Consumer consumer) throws Exception;
int getConsumerCount();
-
+
void addLast(MessageReference ref);
+ void addLast(MessageReference ref, boolean direct);
+
void addFirst(MessageReference ref);
void acknowledge(MessageReference ref) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -99,7 +99,7 @@
void sendContinuations(int packetSize, byte[] body, boolean continues) throws Exception;
- void send(ServerMessage message) throws Exception;
+ void send(ServerMessage message, boolean direct) throws Exception;
void sendLarge(byte[] largeMessageHeader) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -17,6 +17,7 @@
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -25,7 +26,13 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.SendAcknowledgementHandler;
+import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.api.core.management.ResourceNames;
@@ -81,7 +88,7 @@
private final SimpleString forwardingAddress;
- private final java.util.Queue<MessageReference> refs = new LinkedList<MessageReference>();
+ private final java.util.Queue<MessageReference> refs = new ConcurrentLinkedQueue<MessageReference>();
private final Transformer transformer;
Modified: trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -86,6 +86,7 @@
// TODO we can optimise this so it doesn't copy if it's not routed anywhere else
long id = storageManager.generateUniqueID();
+
ServerMessage copy = message.copy(id);
// This will set the original MessageId, and the original address
@@ -98,7 +99,7 @@
copy = transformer.transform(copy);
}
- postOffice.route(copy, context.getTransaction());
+ postOffice.route(copy, context.getTransaction(), false);
}
public SimpleString getRoutingName()
Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -71,7 +71,7 @@
}
@Override
- public synchronized void add(final MessageReference ref, final boolean first)
+ public synchronized void add(final MessageReference ref, final boolean first, final boolean direct)
{
SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
@@ -107,7 +107,7 @@
map.put(prop, hr);
- super.add(hr, first);
+ super.add(hr, first, direct);
}
}
else
@@ -133,13 +133,13 @@
{
map.put(prop, (HolderReference)ref);
- super.add(ref, first);
+ super.add(ref, first, direct);
}
}
}
else
{
- super.add(ref, first);
+ super.add(ref, first, direct);
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -82,7 +82,8 @@
private final PostOffice postOffice;
- private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(true, QueueImpl.NUM_PRIORITIES);
+ private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(true,
+ QueueImpl.NUM_PRIORITIES);
private final List<ConsumerHolder> consumerList = new ArrayList<ConsumerHolder>();
@@ -228,17 +229,22 @@
{
return filter;
}
-
+
public void addLast(final MessageReference ref)
{
+ addLast(ref, false);
+ }
+
+ public void addLast(final MessageReference ref, final boolean direct)
+ {
messagesAdded.incrementAndGet();
-
- add(ref, false);
+
+ add(ref, false, direct);
}
public void addFirst(final MessageReference ref)
{
- add(ref, true);
+ add(ref, true, false);
}
public void deliverAsync()
@@ -268,19 +274,19 @@
public synchronized void removeConsumer(final Consumer consumer) throws Exception
{
Iterator<ConsumerHolder> iter = consumerList.iterator();
-
+
while (iter.hasNext())
{
ConsumerHolder holder = iter.next();
-
+
if (holder.consumer == consumer)
{
iter.remove();
-
+
break;
}
}
-
+
if (pos > 0 && pos >= consumerList.size())
{
pos = consumerList.size() - 1;
@@ -297,7 +303,7 @@
gids.add(groupID);
}
}
-
+
for (SimpleString gid : gids)
{
groups.remove(gid);
@@ -345,19 +351,19 @@
redistributor = null;
Iterator<ConsumerHolder> iter = consumerList.iterator();
-
+
while (iter.hasNext())
{
ConsumerHolder holder = iter.next();
-
+
if (holder.consumer == redistributor)
{
iter.remove();
-
+
break;
}
}
-
+
if (pos > 0 && pos >= consumerList.size())
{
pos = consumerList.size() - 1;
@@ -854,7 +860,7 @@
{
iter.remove();
ref.getMessage().setPriority(newPriority);
- addLast(ref);
+ addLast(ref, false);
return true;
}
}
@@ -875,7 +881,7 @@
count++;
iter.remove();
ref.getMessage().setPriority(newPriority);
- addLast(ref);
+ addLast(ref, false);
}
}
return count;
@@ -981,7 +987,7 @@
copyMessage.setAddress(toAddress);
- postOffice.route(copyMessage, tx);
+ postOffice.route(copyMessage, tx, false);
acknowledge(tx, ref);
}
@@ -1070,7 +1076,7 @@
copyMessage.setAddress(address);
- postOffice.route(copyMessage, tx);
+ postOffice.route(copyMessage, tx, false);
acknowledge(tx, ref);
@@ -1083,15 +1089,7 @@
{
return;
}
-
- // Disadvantage of this algorithm is that if there are many consumers which are busy a lot of the
- // time, then they get tried with a message each time, and the message put back on the queue, which
- // is inefficient
- // This represents the number of consumers that are unavailable to take a message due either to
- // there not being any messages available for its iterator/in queue or it's busy
- // int unavailableCount = 0;
-
int busyCount = 0;
int nullRefCount = 0;
@@ -1099,10 +1097,10 @@
int size = consumerList.size();
int startPos = pos;
-
+
// Deliver at most 1000 messages in one go, to prevent tying this thread up for too long
int loop = Math.min(messageReferences.size(), 1000);
-
+
for (int i = 0; i < loop; i++)
{
ConsumerHolder holder = consumerList.get(pos);
@@ -1119,7 +1117,7 @@
{
ref = holder.iter.next();
}
-
+
if (ref == null)
{
nullRefCount++;
@@ -1137,21 +1135,21 @@
}
Consumer groupConsumer = null;
-
- //If a group id is set, then this overrides the consumer chosen round-robin
-
+
+ // If a group id is set, then this overrides the consumer chosen round-robin
+
SimpleString groupID = ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
if (groupID != null)
{
groupConsumer = groups.get(groupID);
-
+
if (groupConsumer != null)
{
consumer = groupConsumer;
}
}
-
+
HandleStatus status = handle(ref, consumer);
if (status == HandleStatus.HANDLED)
@@ -1160,7 +1158,7 @@
{
holder.iter.remove();
}
-
+
if (groupID != null && groupConsumer == null)
{
groups.put(groupID, consumer);
@@ -1186,14 +1184,14 @@
messageReferences.addFirst(ref, ref.getMessage().getPriority());
holder.iter = messageReferences.iterator();
-
- //Skip past the one we just put back
-
+
+ // Skip past the one we just put back
+
holder.iter.next();
}
}
}
-
+
pos++;
if (pos == size)
@@ -1224,6 +1222,75 @@
}
+ /*
+ * This method delivers the reference on the callers thread - this can give us better latency in the case there is nothing in the queue
+ */
+ private synchronized boolean deliverDirect(final MessageReference ref)
+ {
+ if (paused || consumerList.isEmpty())
+ {
+ return false;
+ }
+
+ if (checkExpired(ref))
+ {
+ return true;
+ }
+
+ int startPos = pos;
+
+ int size = consumerList.size();
+
+ while (true)
+ {
+ ConsumerHolder holder = consumerList.get(pos);
+
+ Consumer consumer = holder.consumer;
+
+ Consumer groupConsumer = null;
+
+ // If a group id is set, then this overrides the consumer chosen round-robin
+
+ SimpleString groupID = ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+
+ if (groupID != null)
+ {
+ groupConsumer = groups.get(groupID);
+
+ if (groupConsumer != null)
+ {
+ consumer = groupConsumer;
+ }
+ }
+
+ pos++;
+
+ if (pos == size)
+ {
+ pos = 0;
+ }
+
+ HandleStatus status = handle(ref, consumer);
+
+ if (status == HandleStatus.HANDLED)
+ {
+ if (groupID != null && groupConsumer == null)
+ {
+ groups.put(groupID, consumer);
+ }
+
+ return true;
+ }
+
+ if (pos == startPos)
+ {
+ // Tried them all
+
+ return false;
+ }
+ }
+ }
+
private boolean checkExpired(final MessageReference reference)
{
if (reference.getMessage().isExpired())
@@ -1247,15 +1314,23 @@
}
}
- protected void add(final MessageReference ref, final boolean first)
+ protected void add(final MessageReference ref, final boolean first, final boolean direct)
{
if (scheduledDeliveryHandler.checkAndSchedule(ref))
{
return;
}
+ if (direct && messageReferences.isEmpty())
+ {
+ if (deliverDirect(ref))
+ {
+ return;
+ }
+ }
+
int refs;
-
+
if (first)
{
refs = messageReferences.addFirst(ref, ref.getMessage().getPriority());
@@ -1358,7 +1433,7 @@
{
for (MessageReference ref : refs)
{
- add(ref, true);
+ add(ref, true, false);
}
deliverAsync();
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -360,25 +360,7 @@
// dies. It does not mean it will get deleted automatically when the
// session is closed.
// It is up to the user to delete the queue when finished with it
-
- CloseListener closeListener = new CloseListener()
- {
- public void connectionClosed()
- {
- try
- {
- if (postOffice.getBinding(name) != null)
- {
- postOffice.removeBinding(name);
- }
- }
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to remove temporary queue " + name);
- }
- }
- };
-
+
TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(postOffice, name);
remotingConnection.addCloseListener(cleaner);
@@ -988,7 +970,7 @@
currentLargeMessage = msg;
}
- public void send(final ServerMessage message) throws Exception
+ public void send(final ServerMessage message, final boolean direct) throws Exception
{
long id = storageManager.generateUniqueID();
@@ -1016,11 +998,11 @@
{
// It's a management message
- handleManagementMessage(message);
+ handleManagementMessage(message, direct);
}
else
{
- doSend(message);
+ doSend(message, direct);
}
if (defaultAddress == null)
@@ -1045,7 +1027,7 @@
{
currentLargeMessage.releaseResources();
- doSend(currentLargeMessage);
+ doSend(currentLargeMessage, false);
currentLargeMessage = null;
}
@@ -1112,7 +1094,7 @@
started = s;
}
- private void handleManagementMessage(final ServerMessage message) throws Exception
+ private void handleManagementMessage(final ServerMessage message, final boolean direct) throws Exception
{
try
{
@@ -1135,7 +1117,7 @@
{
reply.setAddress(replyTo);
- doSend(reply);
+ doSend(reply, direct);
}
}
@@ -1171,7 +1153,7 @@
}
}
- private void doSend(final ServerMessage msg) throws Exception
+ private void doSend(final ServerMessage msg, final boolean direct) throws Exception
{
// check the user has write access to this address.
try
@@ -1200,7 +1182,7 @@
routingContext.setTransaction(tx);
}
- postOffice.route(msg, routingContext);
+ postOffice.route(msg, routingContext, direct);
routingContext.clear();
}
Modified: trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -720,7 +720,7 @@
new SimpleString(notification.getUID()));
}
- postOffice.route(notificationMessage);
+ postOffice.route(notificationMessage, false);
}
}
}
Modified: trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -150,7 +150,7 @@
MessageReference ref = message.createReference(queue);
- queue.addLast(ref);
+ queue.addLast(ref, false);
refs.add(ref);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -524,4 +524,10 @@
return null;
}
+ public void addLast(MessageReference ref, boolean direct)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
\ No newline at end of file
Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -47,7 +47,7 @@
public void testGetID() throws Exception
{
Channel channel = new SimpleChannel(RandomUtil.randomInt());
- NettyConnection conn = new NettyConnection(channel, new MyListener(), false);
+ NettyConnection conn = new NettyConnection(channel, new MyListener(), false, false);
Assert.assertEquals(channel.getId().intValue(), conn.getID());
}
@@ -59,7 +59,7 @@
Assert.assertEquals(0, channel.getWritten().size());
- NettyConnection conn = new NettyConnection(channel, new MyListener(), false);
+ NettyConnection conn = new NettyConnection(channel, new MyListener(), false, false);
conn.write(buff);
Assert.assertEquals(1, channel.getWritten().size());
@@ -68,7 +68,7 @@
public void testCreateBuffer() throws Exception
{
Channel channel = new SimpleChannel(RandomUtil.randomInt());
- NettyConnection conn = new NettyConnection(channel, new MyListener(), false);
+ NettyConnection conn = new NettyConnection(channel, new MyListener(), false, false);
final int size = 1234;
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2010-05-17 17:39:52 UTC (rev 9241)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2010-05-18 12:00:15 UTC (rev 9242)
@@ -178,4 +178,22 @@
}
+ public void route(ServerMessage message, boolean direct) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void route(ServerMessage message, RoutingContext context, boolean direct) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void route(ServerMessage message, Transaction tx, boolean direct) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
\ No newline at end of file
14 years, 7 months
JBoss hornetq SVN: r9241 - trunk/tests/src/org/hornetq/tests/unit/core/asyncio.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-05-17 13:39:52 -0400 (Mon, 17 May 2010)
New Revision: 9241
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
Log:
just a cleanup
Modified: trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2010-05-17 17:27:15 UTC (rev 9240)
+++ trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2010-05-17 17:39:52 UTC (rev 9241)
@@ -74,7 +74,8 @@
{
super.setUp();
pollerExecutor = Executors.newCachedThreadPool(new HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
- false, this.getClass().getClassLoader()));
+ false,
+ this.getClass().getClassLoader()));
executor = Executors.newSingleThreadExecutor();
}
@@ -100,54 +101,54 @@
}
}
-
+
public void testReleaseBuffers() throws Exception
{
AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
controller.open(FILE_NAME, 10000);
WeakReference<ByteBuffer> bufferCheck = new WeakReference<ByteBuffer>(controller.getHandler());
- controller.fill(0, 10, 1024, (byte)0);
-
+ controller.fill(0, 10, 1024, (byte)0);
+
ByteBuffer write = AsynchronousFileImpl.newBuffer(1024);
-
- for (int i = 0 ; i < 1024; i++)
+
+ for (int i = 0; i < 1024; i++)
{
- write.put(getSamplebyte(i));
+ write.put(UnitTestCase.getSamplebyte(i));
}
final CountDownLatch latch = new CountDownLatch(1);
-
+
controller.write(0, 1024, write, new AIOCallback()
{
-
- public void onError(int errorCode, String errorMessage)
+
+ public void onError(final int errorCode, final String errorMessage)
{
}
-
+
public void done()
{
latch.countDown();
}
});
-
- assertTrue(latch.await(10, TimeUnit.SECONDS));
-
+
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+
WeakReference<ByteBuffer> bufferCheck2 = new WeakReference<ByteBuffer>(write);
-
+
AsynchronousFileImpl.destroyBuffer(write);
write = null;
-
- forceGC();
-
- assertNull(bufferCheck2.get());
-
+
+ UnitTestCase.forceGC();
+
+ Assert.assertNull(bufferCheck2.get());
+
controller.close();
controller = null;
-
- forceGC();
-
- assertNull(bufferCheck.get());
+
+ UnitTestCase.forceGC();
+
+ Assert.assertNull(bufferCheck.get());
}
public void testFileNonExistent() throws Exception
@@ -341,7 +342,7 @@
callbackLocal.latch.await();
- assertTrue(callbackLocal.error);
+ Assert.assertTrue(callbackLocal.error);
callbackLocal = new LocalCallback();
14 years, 7 months
JBoss hornetq SVN: r9240 - in trunk: tests/src/org/hornetq/tests/unit/core/asyncio and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-05-17 13:27:15 -0400 (Mon, 17 May 2010)
New Revision: 9240
Modified:
trunk/pom.xml
trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
Log:
changing public repository in our pom
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2010-05-17 15:21:45 UTC (rev 9239)
+++ trunk/pom.xml 2010-05-17 17:27:15 UTC (rev 9240)
@@ -293,7 +293,7 @@
</releases>
<id>jboss.release</id>
<name>JBoss releases</name>
- <url>http://repository.jboss.org/maven2</url>
+ <url>https://repository.jboss.org/nexus/content/groups/public/</url>
</repository>
</repositories>
@@ -309,7 +309,7 @@
</releases>
<id>jboss.release</id>
<name>JBoss releases</name>
- <url>http://repository.jboss.org/maven2</url>
+ <url>https://repository.jboss.org/nexus/content/groups/public</url>
</pluginRepository>
</pluginRepositories>
<!--<profiles>
@@ -328,19 +328,6 @@
</profiles>-->
- <distributionManagement>
- <repository>
- <id>repository.jboss.org</id>
- <name>JBoss.org Release Distribution Repository</name>
- <url>dav:https://svn.jboss.org/repos/repository.jboss.org/maven2</url>
- </repository>
- <snapshotRepository>
- <id>snapshots.jboss.org</id>
- <name>JBoss.org Development Snapshot Repository</name>
- <url>dav:https://snapshots.jboss.org/maven2</url>
- </snapshotRepository>
- </distributionManagement>
-
</project>
Modified: trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2010-05-17 15:21:45 UTC (rev 9239)
+++ trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2010-05-17 17:27:15 UTC (rev 9240)
@@ -13,6 +13,7 @@
package org.hornetq.tests.unit.core.asyncio;
+import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
@@ -22,6 +23,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
@@ -98,7 +100,56 @@
}
}
+
+ public void testReleaseBuffers() throws Exception
+ {
+ AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+ controller.open(FILE_NAME, 10000);
+ WeakReference<ByteBuffer> bufferCheck = new WeakReference<ByteBuffer>(controller.getHandler());
+ controller.fill(0, 10, 1024, (byte)0);
+
+ ByteBuffer write = AsynchronousFileImpl.newBuffer(1024);
+
+ for (int i = 0 ; i < 1024; i++)
+ {
+ write.put(getSamplebyte(i));
+ }
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ controller.write(0, 1024, write, new AIOCallback()
+ {
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ latch.countDown();
+ }
+ });
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ WeakReference<ByteBuffer> bufferCheck2 = new WeakReference<ByteBuffer>(write);
+
+ AsynchronousFileImpl.destroyBuffer(write);
+
+ write = null;
+
+ forceGC();
+
+ assertNull(bufferCheck2.get());
+
+ controller.close();
+ controller = null;
+
+ forceGC();
+
+ assertNull(bufferCheck.get());
+ }
+
public void testFileNonExistent() throws Exception
{
final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
14 years, 7 months
JBoss hornetq SVN: r9239 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-05-17 11:21:45 -0400 (Mon, 17 May 2010)
New Revision: 9239
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
tweak
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-05-14 20:21:12 UTC (rev 9238)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-05-17 15:21:45 UTC (rev 9239)
@@ -35,7 +35,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
14 years, 7 months
JBoss hornetq SVN: r9238 - trunk/native/bin.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-05-14 16:21:12 -0400 (Fri, 14 May 2010)
New Revision: 9238
Modified:
trunk/native/bin/libHornetQAIO32.so
Log:
32 bits compilation
Modified: trunk/native/bin/libHornetQAIO32.so
===================================================================
(Binary files differ)
14 years, 7 months
JBoss hornetq SVN: r9237 - trunk/native/bin.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-05-14 15:54:12 -0400 (Fri, 14 May 2010)
New Revision: 9237
Modified:
trunk/native/bin/libHornetQAIO64.so
Log:
64 bits compilation
Modified: trunk/native/bin/libHornetQAIO64.so
===================================================================
(Binary files differ)
14 years, 7 months
JBoss hornetq SVN: r9236 - trunk/src/main/org/hornetq/core/asyncio/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-05-14 15:34:57 -0400 (Fri, 14 May 2010)
New Revision: 9236
Modified:
trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-387
Modified: trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2010-05-14 19:28:43 UTC (rev 9235)
+++ trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2010-05-14 19:34:57 UTC (rev 9236)
@@ -51,7 +51,7 @@
/** This definition needs to match Version.h on the native sources.
Or else the native module won't be loaded because of version mismatches */
- private static int EXPECTED_NATIVE_VERSION = 28;
+ private static int EXPECTED_NATIVE_VERSION = 29;
/** Used to determine the next writing sequence */
private final AtomicLong nextWritingSequence = new AtomicLong(0);
@@ -102,7 +102,7 @@
}
catch (Throwable e)
{
- AsynchronousFileImpl.log.trace(name + " -> error loading the native library", e);
+ AsynchronousFileImpl.log.debug(name + " -> error loading the native library", e);
return false;
}
@@ -163,7 +163,7 @@
/**
* Warning: Beware of the C++ pointer! It will bite you! :-)
*/
- private long handler;
+ private ByteBuffer handler;
// A context switch on AIO would make it to synchronize the disk before
// switching to the new thread, what would cause
@@ -258,13 +258,13 @@
stopPoller();
}
- AsynchronousFileImpl.closeInternal(handler);
- if (handler != 0)
+ if (handler != null)
{
+ AsynchronousFileImpl.closeInternal(handler);
AsynchronousFileImpl.addMax(-maxIO);
}
opened = false;
- handler = 0;
+ handler = null;
}
finally
{
@@ -416,7 +416,7 @@
}
/** Return the JNI handler used on C++ */
- public long getHandler()
+ public ByteBuffer getHandler()
{
return handler;
}
@@ -618,30 +618,30 @@
private static native ByteBuffer newNativeBuffer(long size);
- private static native long init(String fileName, int maxIO, Logger logger) throws HornetQException;
+ private static native ByteBuffer init(String fileName, int maxIO, Logger logger) throws HornetQException;
- private native long size0(long handle) throws HornetQException;
+ private native long size0(ByteBuffer handle) throws HornetQException;
- private native void write(long handle,
+ private native void write(ByteBuffer handle,
long sequence,
long position,
long size,
ByteBuffer buffer,
AIOCallback aioPackage) throws HornetQException;
- private native void read(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
+ private native void read(ByteBuffer handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
- private static native void fill(long handle, long position, int blocks, long size, byte fillChar) throws HornetQException;
+ private static native void fill(ByteBuffer handle, long position, int blocks, long size, byte fillChar) throws HornetQException;
- private static native void closeInternal(long handler) throws HornetQException;
+ private static native void closeInternal(ByteBuffer handler) throws HornetQException;
- private static native void stopPoller(long handler) throws HornetQException;
+ private static native void stopPoller(ByteBuffer handler) throws HornetQException;
/** A native method that does nothing, and just validate if the ELF dependencies are loaded and on the correct platform as this binary format */
private static native int getNativeVersion();
/** Poll asynchronous events from internal queues */
- private static native void internalPollEvents(long handler);
+ private static native void internalPollEvents(ByteBuffer handler);
// Inner classes ---------------------------------------------------------------------
14 years, 7 months
JBoss hornetq SVN: r9235 - trunk/native.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-05-14 15:28:43 -0400 (Fri, 14 May 2010)
New Revision: 9235
Modified:
trunk/native/
trunk/native/cleanup-native
Log:
cleanup
Property changes on: trunk/native
___________________________________________________________________
Name: svn:ignore
- *log
autom4te.cache
aclocal.m4
config.h
config.in
config.status
configure
libtool
Makefile
Makefile.in
stamp-h1
+ build-aux
*log
autom4te.cache
aclocal.m4
config.h
config.in
config.status
configure
libtool
Makefile
Makefile.in
stamp-h1
Modified: trunk/native/cleanup-native
===================================================================
--- trunk/native/cleanup-native 2010-05-14 19:26:58 UTC (rev 9234)
+++ trunk/native/cleanup-native 2010-05-14 19:28:43 UTC (rev 9235)
@@ -16,4 +16,5 @@
rm -r ./src/.libs
rm -r ./src/org_hornetq_core_asyncio_impl_AsynchronousFileImpl.h
rm -r ./src/disktest
+rm -r build-aux
14 years, 7 months