Author: clebert.suconic(a)jboss.com
Date: 2011-03-28 17:14:27 -0400 (Mon, 28 Mar 2011)
New Revision: 10402
Modified:
trunk/docs/user-manual/en/paging.xml
trunk/tests/src/org/hornetq/tests/stress/remote/PingStressTest.java
Log:
Fixing PingStressTest
Modified: trunk/docs/user-manual/en/paging.xml
===================================================================
--- trunk/docs/user-manual/en/paging.xml 2011-03-28 21:13:54 UTC (rev 10401)
+++ trunk/docs/user-manual/en/paging.xml 2011-03-28 21:14:27 UTC (rev 10402)
@@ -16,7 +16,6 @@
<!-- and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent
-->
<!-- permitted by applicable law.
-->
<!-- =============================================================================
-->
-
<!DOCTYPE chapter PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN"
"http://www.oasis-open.org/docbook/xml/4.5/docbookx.dtd" [
<!ENTITY % BOOK_ENTITIES SYSTEM "HornetQ_User_Manual.ent">
%BOOK_ENTITIES;
@@ -36,9 +35,12 @@
<title>Page Files</title>
<para>Messages are stored per address on the file system. Each address has
an individual
folder where messages are stored in multiple files (page files). Each file
will contain
- messages up to a max configured size
(<literal>page-size-bytes</literal>). When reading
- page-files all messages on the page-file are read, routed and the file is
deleted as
- soon as the messages are recovered.</para>
+ messages up to a max configured size
(<literal>page-size-bytes</literal>). The system
+ will navigate on the files as needed, and it will remove the page file as
soon as all
+ the messages are acknowledged up to that point.</para>
+ <para>Browsers will read through the page-cursor system.</para>
+ <para>Consumers with selectors will also navigate through the page-files
and it will ignore
+ messages that don't match the criteria.</para>
</section>
<section id="paging.main.config">
<title>Configuration</title>
@@ -160,15 +162,18 @@
able to continue sending.</para>
<para>To do this just set the
<literal>address-full-policy</literal> to <literal
BLOCK</literal> in the address settings</para>
-
<para>In the default configuration, all addresses are configured to block
producers after 10 MiB of data are in the address.</para>
+ <para>In the default configuration, all addresses are configured to block
producers after 10
+ MiB of data are in the address.</para>
</section>
<section>
<title>Caution with Addresses with Multiple Queues</title>
<para>When a message is routed to an address that has multiple queues bound
to it, e.g. a
- JMS subscription, there is only 1 copy of the message in memory. Each queue
only deals
- with a reference to this. Because of this the memory is only freed up once
all queues
- referencing the message have delivered it. This means that if not all queues
deliver the
- message we can end up in a state where messages are not delivered.
</para>
+ JMS subscription in a Topic, there is only 1 copy of the message in memory.
Each queue
+ only deals with a reference to this. Because of this the memory is only freed
up once
+ all queues referencing the message have delivered it.</para>
+ <para>If you have a single lazy subscription, the entire address will
suffer IO performance
+ hit as all the queues will have messages being sent through an extra storage
on the
+ paging system.</para>
<para>For example:</para>
<itemizedlist>
<listitem>
@@ -190,47 +195,6 @@
messages.</para>
</section>
<section>
- <title>Paging and message selectors</title>
- <note>
- <para>Please note that message selectors will only operate on messages
in memory. If you
- have a large amount of messages paged to disk and a selector that only
matches some
- of the paged messages, then those messages won't be consumed until
the messages in
- memory have been consumed. HornetQ does not scan through page files on
disk to
- locate matching messages. To do this efficiently would mean implementing
and
- managing indexes amongst other things. Effectively we would be writing a
relational
- database! This is not the primary role of a messaging system. If you find
yourself
- using selectors which only select small subsets of messages in very large
queues
- which are too large to fit in memory at any one time, then you probably
want a
- relational database not a messaging system - you're effectively
executing queries
- over tables.</para>
- </note>
- </section>
- <section>
- <title>Paging and browsers</title>
- <note>
- <para>Please note that message browsers only operate over messages in
memory. They do
- not operate over messages paged to disk. Messages are paged to disk
- <emphasis>before</emphasis> they are routed to any
queues, so when they are
- paged, they are not in any queues, so will not appear when browsing any
queues.
- </para>
- </note>
- </section>
- <section>
- <title>Paging and unacknowledged messages</title>
- <note>
- <para> Please note that until messages are acknowledged they are still
in memory on the
- server, so they contribute to the size of messages on a particular
address. If
- messages are paged to disk for an address, and are being consumed, they
will be
- depaged from disk when enough memory has been freed up in that address
after
- messages have been consumed and acknowledged. However if messages are
not
- acknowledged then more messages will not be depaged since there is no
free space in
- memory. In this case message consumption can appear to hang. If not
acknowledging
- explictly messages are acknowledged according to the <literal
- >ack-batch-size</literal> setting. Be careful not to set
your paging max size to
- a figure lower than ack-batch-size or your system may appear to hang!
</para>
- </note>
- </section>
- <section>
<title>Example</title>
<para>See <xref linkend="examples.paging"/> for an example
which shows how to use paging
with HornetQ.</para>
Modified: trunk/tests/src/org/hornetq/tests/stress/remote/PingStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/remote/PingStressTest.java 2011-03-28
21:13:54 UTC (rev 10401)
+++ trunk/tests/src/org/hornetq/tests/stress/remote/PingStressTest.java 2011-03-28
21:14:27 UTC (rev 10402)
@@ -14,6 +14,7 @@
package org.hornetq.tests.stress.remote;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
@@ -122,11 +123,11 @@
server.getRemotingService().addInterceptor(noPongInterceptor);
ServerLocator locator =
HornetQClient.createServerLocatorWithoutHA(transportConfig);
+ locator.setClientFailureCheckPeriod(PingStressTest.PING_INTERVAL);
+ locator.setConnectionTTL((long)(PingStressTest.PING_INTERVAL * 1.5));
+ locator.setCallTimeout(PingStressTest.PING_INTERVAL * 10);
final ClientSessionFactory csf1 = locator.createSessionFactory();
- csf1.getServerLocator().setClientFailureCheckPeriod(PingStressTest.PING_INTERVAL);
- csf1.getServerLocator().setConnectionTTL((long)(PingStressTest.PING_INTERVAL *
1.5));
- csf1.getServerLocator().setCallTimeout(PingStressTest.PING_INTERVAL * 10);
final int numberOfSessions = 1;
final int numberOfThreads = 30;
@@ -153,12 +154,12 @@
{
ServerLocator locator =
HornetQClient.createServerLocatorWithoutHA(transportConfig);
+ locator.setClientFailureCheckPeriod(PingStressTest.PING_INTERVAL);
+ locator.setConnectionTTL((long)(PingStressTest.PING_INTERVAL * 1.5));
+ locator.setCallTimeout(PingStressTest.PING_INTERVAL * 10);
+
final ClientSessionFactory csf2 = locator.createSessionFactory();
-
csf2.getServerLocator().setClientFailureCheckPeriod(PingStressTest.PING_INTERVAL);
-
csf2.getServerLocator().setConnectionTTL((long)(PingStressTest.PING_INTERVAL * 1.5));
- csf2.getServerLocator().setCallTimeout(PingStressTest.PING_INTERVAL *
10);
-
// Start all at once to make concurrency worst
flagAligned.countDown();
flagStart.await();
@@ -185,10 +186,15 @@
Thread.sleep(PingStressTest.PING_INTERVAL * (threadNumber % 3));
session.close();
+
+ csf2.close();
+
+ locator.close();
}
}
catch (Throwable e)
{
+ e.printStackTrace();
failure = e;
}
}
@@ -202,7 +208,7 @@
threads[i].start();
}
- flagAligned.await();
+ assertTrue(flagAligned.await(10, TimeUnit.SECONDS));
flagStart.countDown();
Throwable e = null;
@@ -219,6 +225,8 @@
{
throw new Exception("Test Failed", e);
}
+
+ csf1.close();
}