JBoss hornetq SVN: r9070 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-04-07 17:45:14 -0400 (Wed, 07 Apr 2010)
New Revision: 9070
Modified:
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
Log:
HORNETQ-354 - minor optimization on TimedBuffer (possible cause for JBAS-7877).
Acquiring the semaphore before starting the CheckTimer thread maybe could avoid competing for the semaphore.acquire during startup.
The sempahore needs to be acquired anyway, so i will remove this possibility just in case
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2010-04-07 14:37:50 UTC (rev 9069)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2010-04-07 21:45:14 UTC (rev 9070)
@@ -127,6 +127,16 @@
return;
}
+
+ // Need to start with the spin limiter acquired
+ try
+ {
+ spinLimiter.acquire();
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
timerRunnable = new CheckTimer();
timerThread = new Thread(timerRunnable, "hornetq-buffer-timeout");
@@ -140,15 +150,6 @@
logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000);
}
- // Need to start with the spin limiter acquired
- try
- {
- spinLimiter.acquire();
- }
- catch (InterruptedException ignore)
- {
- }
-
started = true;
}
@@ -163,10 +164,10 @@
bufferObserver = null;
+ timerRunnable.close();
+
spinLimiter.release();
- timerRunnable.close();
-
if (logRates)
{
logRatesTimerTask.cancel();
14 years, 1 month
JBoss hornetq SVN: r9069 - branches/HnetQ_323_cn/docs/user-manual/zh.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-04-07 10:37:50 -0400 (Wed, 07 Apr 2010)
New Revision: 9069
Modified:
branches/HnetQ_323_cn/docs/user-manual/zh/message-expiry.xml
branches/HnetQ_323_cn/docs/user-manual/zh/undelivered-messages.xml
Log:
done
Modified: branches/HnetQ_323_cn/docs/user-manual/zh/message-expiry.xml
===================================================================
--- branches/HnetQ_323_cn/docs/user-manual/zh/message-expiry.xml 2010-04-07 12:22:34 UTC (rev 9068)
+++ branches/HnetQ_323_cn/docs/user-manual/zh/message-expiry.xml 2010-04-07 14:37:50 UTC (rev 9069)
@@ -17,78 +17,65 @@
<!-- permitted by applicable law. -->
<!-- ============================================================================= -->
<chapter id="message-expiry">
- <title>Message Expiry</title>
- <para>Messages can be set with an optional <emphasis>time to live</emphasis> when sending
- them.</para>
- <para>HornetQ will not deliver a message to a consumer after it's time to live has been exceeded.
- If the message hasn't been delivered by the time that time to live is reached the server can
- discard it.</para>
- <para>HornetQ's addresses can be assigned a expiry address so that, when messages are expired,
- they are removed from the queue and sent to the expiry address. Many different queues can be
- bound to an expiry address. These <emphasis>expired</emphasis> messages can later be consumed
- for further inspection.</para>
+ <title>过期的消息</title>
+ <para>消息在发送时有一个可选的<emphasis>生存时间</emphasis>属性。</para>
+ <para>如果一个消息已经超过了它的生存时间,HornetQ不再将它传递给任何接收者。
+ 服务器会将过期的消息抛弃。</para>
+ <para>HornetQ的地址可以配置一个过期地址,当消息过期时,它们被从队列中删除并被转移到过期地址中。
+ 多个不同的队列可以绑定到一个过期地址上。这些过期的消息过后可以接收下来以供分析用。</para>
<section>
- <title>Message Expiry</title>
- <para>Using HornetQ Core API, you can set an expiration time directly on the message:</para>
+ <title>过期消息的配置</title>
+ <para>如果使用HornetQ核心API,可以直接在消息上设置过期时间:</para>
<programlisting>
// message will expire in 5000ms from now
message.setExpiration(System.currentTimeMillis() + 5000);
</programlisting>
- <para>JMS MessageProducer allows to set a TimeToLive for the messages it sent:</para>
+ <para>JMS的MessageProducer可以设置一个TimeToLive来控制其发送的消息:</para>
<programlisting>
// messages sent by this producer will be retained for 5s (5000ms) before expiration
producer.setTimeToLive(5000);
</programlisting>
- <para>Expired messages which are consumed from an expiry address have the following
- properties:</para>
+ <para>从过期地址中接收下来的消息有以下属性:</para>
<itemizedlist>
<listitem>
<para><literal>_HQ_ORIG_ADDRESS</literal></para>
- <para>a String property containing the <emphasis>original address</emphasis> of the
- expired message </para>
+ <para>这是一个字符串,它是该消息的<emphasis>原始地址</emphasis>。</para>
</listitem>
<listitem>
<para><literal>_HQ_ACTUAL_EXPIRY</literal></para>
- <para>a Long property containing the <emphasis>actual expiration time</emphasis> of the
- expired message</para>
+ <para>一个长整型量,代表此消息<emphasis>实际过期时间</emphasis>。</para>
</listitem>
</itemizedlist>
</section>
<section id="message-expiry.configuring">
- <title>Configuring Expiry Addresses</title>
- <para>Expiry address are defined in the address-setting configuration:</para>
+ <title>配置过期地址</title>
+ <para>过期地址配置在地址设置(address-setting)中:</para>
<programlisting>
<!-- expired messages in exampleQueue will be sent to the expiry address expiryQueue -->
<address-setting match="jms.queue.exampleQueue">
<expiry-address>jms.queue.expiryQueue</expiry-address>
</address-setting>
</programlisting>
- <para>If messages are expired and no expiry address is specified, messages are simply removed
- from the queue and dropped. Address wildcards can be used to configure expiry address for a
- set of addresses (see <xref linkend="wildcard-syntax"/>).</para>
+ <para>如果没有定义过期地址,当一个消息过期时,它将被删除。配置过期地址时可以使用通配符
+ 来给一组地址配置过期地址。(参见<xref linkend="wildcard-syntax"/>)。</para>
</section>
<section id="configuring.expiry.reaper">
- <title>Configuring The Expiry Reaper Thread</title>
- <para>A reaper thread will periodically inspect the queues to check if messages have
- expired.</para>
- <para>The reaper thread can be configured with the following properties in <literal
- >hornetq-configuration.xml</literal></para>
+ <title>配置过期回收线程</title>
+ <para>HornetQ有一个回收线程定期地检查队列中的消息,目的是发现是否有消息过期。</para>
+ <para>在<literal>hornetq-configuration.xml</literal>文件中可以对回收线程进行配置,参数如下:</para>
<itemizedlist>
<listitem>
<para><literal>message-expiry-scan-period</literal></para>
- <para>How often the queues will be scanned to detect expired messages (in milliseconds,
- default is 30000ms)</para>
+ <para>过期消息的扫描间隔(单位毫秒,默认为30000ms)。</para>
</listitem>
<listitem>
<para><literal>message-expiry-thread-priority</literal></para>
- <para>The reaper thread priority (it must be between 0 and 9, 9 being the highest
- priority, default is 3)</para>
+ <para>回收线程的优先级(为0到9的整数,9优先级最高。默认是3)。</para>
</listitem>
</itemizedlist>
</section>
<section>
- <title>Example</title>
- <para>See <xref linkend="examples.expiry"/> for an example which shows how message expiry is
- configured and used with JMS.</para>
+ <title>例子</title>
+ <para>参见<xref linkend="examples.expiry"/>。这个例子展示了在JMS中如何配置使用消息过期功能。</para>
</section>
</chapter>
Modified: branches/HnetQ_323_cn/docs/user-manual/zh/undelivered-messages.xml
===================================================================
--- branches/HnetQ_323_cn/docs/user-manual/zh/undelivered-messages.xml 2010-04-07 12:22:34 UTC (rev 9068)
+++ branches/HnetQ_323_cn/docs/user-manual/zh/undelivered-messages.xml 2010-04-07 14:37:50 UTC (rev 9069)
@@ -17,70 +17,56 @@
<!-- permitted by applicable law. -->
<!-- ============================================================================= -->
<chapter id="undelivered-messages">
- <title>Message Redelivery and Undelivered Messages</title>
- <para>Messages can be delivered unsuccessfully (e.g. if the transacted session used to consume
- them is rolled back). Such a message goes back to its queue ready to be redelivered. However,
- this means it is possible for a message to be delivered again and again without any success
- and remain in the queue, clogging the system.</para>
- <para>There are 2 ways to deal with these undelivered messages:</para>
+ <title>消息再传递及未传递的消息</title>
+ <para>消息有可能传递失败(比如相关的事务发生回滚)。失败的消息将退回到队列中准备重新传递。这样就会出现
+ 一种情况,就是同一个消息会被反复的传递而总不成功,以至于使系统处于忙的状态。</para>
+ <para>对于这样的消息我们有两种处理方法:</para>
<itemizedlist>
<listitem>
- <para>Delayed redelivery.</para>
- <para>It is possible to delay messages redelivery to let the client some time to recover
- from transient failures and not overload its network or CPU resources</para>
+ <para>延迟再传递</para>
+ <para>这种方法是让消息再次传递时有一定的时间延迟,这样客户端就有机会从故障中恢复,同时网络连接和CPU资源
+ 也不致于被过度占用。</para>
</listitem>
<listitem>
- <para>Dead Letter Address.</para>
- <para>It is also possible to configure a dead letter address so that after a specified
- number of unsuccessful deliveries, messages are removed from the queue and will not be
- delivered again</para>
+ <para>死信(Dead Letter)地址</para>
+ <para>这种方法是规定一个死信地址,如果消息再被反复传递达到一定次数时,就会从原有队列中删除,转到这个
+ 死信地址中。这样消息就不会永远地重复传递了。</para>
</listitem>
</itemizedlist>
- <para>Both options can be combined for maximum flexibility.</para>
+ <para>以上两种方法可以合理搭配使用,使解决方案更加灵活。</para>
<section>
- <title>Delayed Redelivery</title>
- <para>Delaying redelivery can often be useful in the case that clients regularly fail or
- rollback. Without a delayed redelivery, the system can get into a "thrashing" state, with
- delivery being attempted, the client rolling back, and delivery being re-attempted ad
- infinitum in quick succession, consuming valuable CPU and network resources.</para>
+ <title>延迟再传递</title>
+ <para>延迟再传递对于时常出现故障或回滚的客户端十分有用。如果没有延迟,整个系统可能会处于一种”疯狂“的状态。
+ 就是消息被传递、回滚、再传递,这样反复不间断地进行着,将宝贵的CPU和网络资源占用。</para>
<section id="undelivered-messages.delay">
- <title>Configuring Delayed Redelivery</title>
- <para>Delayed redelivery is defined in the address-setting configuration:</para>
+ <title>延迟再传递的配置</title>
+ <para>延迟再传递的配置在地址设定内(address-setting):</para>
<programlisting>
<!-- delay redelivery of messages for 5s -->
<address-setting match="jms.queue.exampleQueue">
<redelivery-delay>5000</redelivery-delay>
</address-setting>
</programlisting>
- <para>If a <literal>redelivery-delay</literal> is specified, HornetQ will wait this delay
- before redelivering the messages</para>
- <para>By default, there is no redelivery delay (<literal>redelivery-delay</literal>is set
- to 0).</para>
- <para>Address wildcards can be used to configure redelivery delay for a set of addresses
- (see <xref linkend="wildcard-syntax"/>), so you don't have to specify redelivery delay
- individually for each address.</para>
+ <para>如果定义了<literal>redelivery-delay</literal>,HornetQ在再传递之前等待所定义的时间。</para>
+ <para>默认是没有延时的(即<literal>redelivery-delay</literal>的值是0)。</para>
+ <para>可以使用通配符为一组地址定义再传递的延迟(参见<xref linkend="wildcard-syntax"/>)。
+ </para>
</section>
<section>
- <title>Example</title>
- <para>See <xref linkend="examples.delayed-redelivery"/> for an example which shows how
- delayed redelivery is configured and used with JMS.</para>
+ <title>例子</title>
+ <para>参见 <xref linkend="examples.delayed-redelivery"/>。这是一个JMS应用中配置延迟再传递的例子。</para>
</section>
</section>
<section>
- <title>Dead Letter Addresses</title>
- <para>To prevent a client infinitely receiving the same undelivered message (regardless of
- what is causing the unsuccessful deliveries), messaging systems define <emphasis
- role="italic">dead letter addresses</emphasis>: after a specified unsuccessful delivery
- attempts, the message is removed from the queue and send instead to a dead letter address. </para>
- <para>Any such messages can then be diverted to queue(s) where they can later be perused by
- the system administrator for action to be taken.</para>
- <para>HornetQ's addresses can be assigned a dead letter address. Once the messages have be
- unsuccessfully delivered for a given number of attempts, they are removed from the queue
- and sent to the dead letter address. These <emphasis>dead letter</emphasis> messages can
- later be consumed for further inspection.</para>
+ <title>死信地址</title>
+ <para>通过定义一个<emphasis role="italic">死信地址</emphasis>也可以防止同一个消息被无休止地传递:
+ 当一个消息被重复传递一定次数后,就会从队列中删除并传递到定义好的死信地址中。</para>
+ <para>这些死信中的消息之后可以转发到某个队列中,以供系统管理员分析处理。</para>
+ <para>每个HornetQ的地址可以有一个死信地址。当一个消息被反复传递达一定次数时,它就会被从队列中删除并送到
+ 死信地址。这些<emphasis>死信</emphasis>消息可以被接收进行分析处理。</para>
<section id="undelivered-messages.configuring">
- <title>Configuring Dead Letter Addresses</title>
- <para>Dead letter address is defined in the address-setting configuration:</para>
+ <title>配置死信地址</title>
+ <para>死信地址定义在地址设定中(address-setting):</para>
<programlisting>
<!-- undelivered messages in exampleQueue will be sent to the dead letter address
deadLetterQueue after 3 unsuccessful delivery attempts
@@ -90,52 +76,41 @@
<max-delivery-attempts>3</max-delivery-attempts>
</address-setting>
</programlisting>
- <para>If a <literal>dead-letter-address</literal> is not specified, messages will removed
- after <literal>max-delivery-attempts</literal> unsuccessful attempts.</para>
- <para>By default, messages are redelivered 10 times at the maximum. Set <literal
- >max-delivery-attempts</literal> to -1 for infinite redeliveries.</para>
- <para>For example, a dead letter can be set globally for a set of matching addresses and
- you can set <literal>max-delivery-attempts</literal> to -1 for a specific address
- setting to allow infinite redeliveries only for this address.</para>
- <para>Address wildcards can be used to configure dead letter settings for a set of
- addresses (see <xref linkend="wildcard-syntax"/>).</para>
+ <para>如果没有定义<literal>dead-letter-address</literal>,消息在经过
+ <literal>max-delivery-attempts</literal>次重复传递后被删除。</para>
+ <para>默认的重复传递次数为10。将<literal>max-delivery-attempts</literal>设定为-1
+ 表示无限次重复传递。</para>
+ <para>例如,对一组地址设置了一个通用的死信地址后,再将其中某个地址的<literal>max-delivery-attempts</literal>
+ 设定为-1时,那么只有这个地址的再传递次数是无限的。</para>
+ <para>可以使用通配符对一组地址设定死信地址(参见<xref linkend="wildcard-syntax"/>)。</para>
</section>
<section>
- <title>Dead Letter Properties</title>
- <para>Dead letter messages which are consumed from a dead letter address have the following
- property:</para>
+ <title>死信的属性</title>
+ <para>从死信地址接收到的消息有以下属性:</para>
<itemizedlist>
<listitem>
<para><literal>_HQ_ORIG_ADDRESS</literal></para>
- <para>a String property containing the <emphasis>original address</emphasis> of
- the dead letter message </para>
+ <para>这是一个字符串属性,它是该死信消息的<emphasis>原始地址</emphasis>。</para>
</listitem>
</itemizedlist>
</section>
<section>
- <title>Example</title>
- <para>See <xref linkend="examples.dead-letter"/> for an example which shows how dead letter
- is configured and used with JMS.</para>
+ <title>例子</title>
+ <para>参见<xref linkend="examples.dead-letter"/>。这个例子给出了在JMS应用中死信的配置与使用。</para>
</section>
</section>
<section id="configuring.delivery.count.persistence">
- <title>Delivery Count Persistence</title>
- <para>In normal use, HornetQ does not update delivery count <emphasis>persistently</emphasis>
- until a message is rolled back (i.e. the delivery count is not updated
- <emphasis>before</emphasis> the message is delivered to the consumer). In most messaging
- use cases, the messages are consumed, acknowledged and forgotten as soon as they are
- consumed. In these cases, updating the delivery count persistently before delivering the
- message would add an extra persistent step <emphasis>for each message delivered</emphasis>,
- implying a significant performance penalty.</para>
- <para>However, if the delivery count is not updated persistently before the message delivery
- happens, in the event of a server crash, messages might have been delivered but that will
- not have been reflected in the delivery count. During the recovery phase, the server will
- not have knowledge of that and will deliver the message with <literal>redelivered</literal>
- set to <literal>false</literal> while it should be <literal>true</literal>. </para>
- <para>As this behavior breaks strict JMS semantics, HornetQ allows to persist delivery count
- before message delivery but disabled it by default for performance implications.</para>
- <para>To enable it, set <literal>persist-delivery-count-before-delivery</literal> to <literal
- >true</literal> in <literal>hornetq-configuration.xml</literal>:</para>
+ <title>传递计数的持久化</title>
+ <para>通常情况下HornetQ在一个消息被回滚之前并不更新持久的传递计数(即在消息传递到接收者之前不会更新传递计数)。
+ 大多数情况下消息被接收、通知、然后被忘掉。这样对<emphasis>每一个消息</emphasis>的传递都要更新一次持久的
+ 传递计数,会显著降低系统的性能。</para>
+ <para>介是如果在消息传递之前不进行持久传递计数的更新,服务器一旦发生故障而崩溃,就会造成消息可能被传递出去而传递
+ 计数却没有正确反映出传递的結果。在恢复阶段,服务器将错误地将该消息的<literal>redelivered</literal>设为
+ <literal>false</literal>而不是<literal>true</literal>。 </para>
+ <para>这样是不符合严格的JMS要求的。因此HornetQ允许在消息传递前更新传递计数。但是默认不这样做,目的是优先考虑
+ 了它对性能的影响。</para>
+ <para>要想打开传递计数更新功能,将<literal>hornetq-configuration.xml</literal>文件中的
+ <literal>persist-delivery-count-before-delivery</literal>设为<literal>true</literal>即可:</para>
<programlisting>
<persist-delivery-count-before-delivery>true</persist-delivery-count-before-delivery>
</programlisting>
14 years, 1 month
JBoss hornetq SVN: r9068 - trunk/src/main/org/hornetq/jms/bridge/impl.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-04-07 08:22:34 -0400 (Wed, 07 Apr 2010)
New Revision: 9068
Modified:
trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-298
Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-04-07 12:19:43 UTC (rev 9067)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-04-07 12:22:34 UTC (rev 9068)
@@ -1261,6 +1261,11 @@
// If this fails we should attempt to cleanup or we might end up in some weird state
+ if (log.isTraceEnabled())
+ {
+ log.trace("Failed to connect bridge", e);
+ }
+
cleanup();
return false;
14 years, 1 month
JBoss hornetq SVN: r9067 - trunk/tests/src/org/hornetq/tests/unit/core/server/impl.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-04-07 08:19:43 -0400 (Wed, 07 Apr 2010)
New Revision: 9067
Added:
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplPriorityTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-192
Added: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplPriorityTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplPriorityTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplPriorityTest.java 2010-04-07 12:19:43 UTC (rev 9067)
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.core.server.impl;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.filter.Filter;
+import org.hornetq.core.server.HandleStatus;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakeConsumer;
+import org.hornetq.tests.util.UnitTestCase;
+
+public class QueueImplPriorityTest extends UnitTestCase
+{
+ // The tests ----------------------------------------------------------------
+
+ private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+
+ private static final SimpleString queue1 = new SimpleString("queue1");
+
+ private static final SimpleString address1 = new SimpleString("address1");
+
+ class FakeFilter implements Filter
+ {
+ public SimpleString getFilterString()
+ {
+ return null;
+ }
+
+ public boolean match(final ServerMessage message)
+ {
+ return true;
+ }
+
+ }
+
+ public void testPriority() throws Exception
+ {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ FakeConsumer cons1 = new FakeConsumer(new FakeFilter());
+
+ QueueImpl queue = new QueueImpl(1, address1, queue1, null, // filter
+ false, // durable
+ true, // temporary
+ scheduledExecutor,
+ null, // post office
+ null, // storage manager
+ null, // address setting repo
+ executor); // executor
+
+ queue.addConsumer(cons1);
+
+ cons1.setStatusImmediate(HandleStatus.HANDLED);
+ MessageReference ref = generateReference(queue, 0);
+ ref.getMessage().setPriority((byte)2);
+ queue.addLast(ref);
+
+ ref = generateReference(queue, 1);
+ ref.getMessage().setPriority((byte)2);
+ queue.addLast(ref);
+
+ Assert.assertEquals(2, queue.getMessageCount());
+ Assert.assertEquals(2, queue.getDeliveringCount());
+ Assert.assertEquals(2, cons1.getReferences().size());
+
+ cons1.setStatusImmediate(HandleStatus.BUSY);
+ ref = generateReference(queue, 3);
+ ref.getMessage().setPriority((byte)2);
+ queue.addLast(ref);
+
+ ref = generateReference(queue, 4);
+ ref.getMessage().setPriority((byte)2);
+ queue.addLast(ref);
+
+ // This will initiate the priority queue iterator, which has 2 elements (msg 3,4)
+ queue.deliverNow();
+
+ Assert.assertEquals(4, queue.getMessageCount());
+ Assert.assertEquals(2, queue.getDeliveringCount());
+ Assert.assertEquals(2, cons1.getReferences().size());
+
+ cons1.setStatusImmediate(HandleStatus.HANDLED);
+ ref = generateReference(queue, 2);
+ ref.getMessage().setPriority((byte)4);
+ queue.addLast(ref);
+
+ ref = generateReference(queue, 5);
+ ref.getMessage().setPriority((byte)2);
+ queue.addLast(ref);
+
+ Assert.assertEquals(6, queue.getMessageCount());
+ Assert.assertEquals(2, queue.getDeliveringCount());
+ Assert.assertEquals(2, cons1.getReferences().size());
+
+ // Since the iterator is already initiated and there are more messages with lower priority
+ // It will deliver low priority messages first
+ queue.deliverNow();
+
+ Assert.assertEquals(6, queue.getMessageCount());
+ Assert.assertEquals(6, queue.getDeliveringCount());
+ Assert.assertEquals(6, cons1.getReferences().size());
+
+ for (int i = 0; i < 6; i++)
+ {
+ // System.out.println(cons1.getReferences().get(i).getMessage().getMessageID());
+ Assert.assertEquals(i, cons1.getReferences().get(i).getMessage().getMessageID());
+ }
+
+ executor.shutdown();
+ }
+}
14 years, 1 month
JBoss hornetq SVN: r9066 - in trunk: src/main/org/hornetq/core/client/impl and 8 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-04-07 08:12:01 -0400 (Wed, 07 Apr 2010)
New Revision: 9066
Added:
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpireMessage.java
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java
Removed:
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpiredMessage.java
Modified:
trunk/docs/user-manual/en/client-reconnection.xml
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java
trunk/src/main/org/hornetq/core/server/ServerConsumer.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-275
Modified: trunk/docs/user-manual/en/client-reconnection.xml
===================================================================
--- trunk/docs/user-manual/en/client-reconnection.xml 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/docs/user-manual/en/client-reconnection.xml 2010-04-07 12:12:01 UTC (rev 9066)
@@ -50,11 +50,10 @@
instance using the appropriate setter method.</para>
<para>If you're using core you can set these values directly on the <literal
>ClientSessionFactory</literal> instance using the appropriate setter method.</para>
- <para>The window is specified in bytes, and has a default value of <literal
- >1MiB</literal>.</para>
+ <para>The window is specified in bytes.</para>
<para>Setting this parameter to <literal>-1</literal> disables any buffering and prevents
any re-attachment from occurring, forcing reconnect instead. The default value for this
- parameter is <literal>-1</literal>.</para>
+ parameter is <literal>-1</literal>. (Which means by default no auto re-attachment will occur)</para>
</section>
<section>
<title>Session reconnection</title>
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -79,7 +79,8 @@
private final int ackBatchSize;
- private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<ClientMessageInternal>(false, ClientConsumerImpl.NUM_PRIORITIES);
+ private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<ClientMessageInternal>(false,
+ ClientConsumerImpl.NUM_PRIORITIES);
private final Runner runner = new Runner();
@@ -114,9 +115,11 @@
private boolean stopped = false;
private final AtomicLong forceDeliveryCount = new AtomicLong(0);
-
+
private final SessionQueueQueryResponseMessage queueInfo;
+ private volatile boolean ackIndividually;
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -151,7 +154,7 @@
this.clientWindowSize = clientWindowSize;
this.ackBatchSize = ackBatchSize;
-
+
this.queueInfo = queueInfo;
}
@@ -192,7 +195,7 @@
// Effectively infinite
timeout = Long.MAX_VALUE;
}
-
+
boolean deliveryForced = false;
long start = -1;
@@ -414,6 +417,8 @@
lastAckedMessage = null;
creditsToSend = 0;
+
+ ackIndividually = false;
}
public synchronized void start()
@@ -435,7 +440,7 @@
{
return queueInfo;
}
-
+
public long getID()
{
return id;
@@ -463,16 +468,24 @@
// This is ok - we just ignore the message
return;
}
-
+
ClientMessageInternal messageToHandle = message;
-
+
if (messageToHandle.getAddress() == null)
{
messageToHandle.setAddressTransient(queueInfo.getAddress());
}
messageToHandle.onReceipt(this);
-
+
+ if (message.getPriority() != 4)
+ {
+ // We have messages of different priorities so we need to ack them individually since the order
+ // of them in the ServerConsumerImpl delivery list might not be the same as the order they are
+ // consumed in, which means that acking all up to won't work
+ ackIndividually = true;
+ }
+
// Add it to the buffer
buffer.addLast(messageToHandle, messageToHandle.getPriority());
@@ -546,9 +559,9 @@
// Need to send credits for the messages in the buffer
HQIterator<ClientMessageInternal> iter = buffer.iterator();
-
+
ClientMessageInternal message;
-
+
while ((message = iter.next()) != null)
{
flowControlBeforeConsumption(message);
@@ -575,16 +588,28 @@
public void acknowledge(final ClientMessage message) throws HornetQException
{
ClientMessageInternal cmi = (ClientMessageInternal)message;
-
- ackBytes += message.getEncodeSize();
- if (ackBytes >= ackBatchSize)
+ if (ackIndividually)
{
- doAck(cmi);
+ if (lastAckedMessage != null)
+ {
+ flushAcks();
+ }
+
+ session.individualAcknowledge(id, message.getMessageID());
}
else
{
- lastAckedMessage = cmi;
+ ackBytes += message.getEncodeSize();
+
+ if (ackBytes >= ackBatchSize)
+ {
+ doAck(cmi);
+ }
+ else
+ {
+ lastAckedMessage = cmi;
+ }
}
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -51,8 +51,9 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionExpireMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
@@ -576,10 +577,10 @@
}
public ClientMessage createMessage(final byte type,
- final boolean durable,
- final long expiration,
- final long timestamp,
- final byte priority)
+ final boolean durable,
+ final long expiration,
+ final long timestamp,
+ final byte priority)
{
return new ClientMessageImpl(type, durable, expiration, timestamp, priority, initialMessagePacketSize);
}
@@ -712,6 +713,30 @@
}
}
+ public void individualAcknowledge(final long consumerID, final long messageID) throws HornetQException
+ {
+ // if we're pre-acknowledging then we don't need to do anything
+ if (preAcknowledge)
+ {
+ return;
+ }
+
+ checkClosed();
+
+ SessionIndividualAcknowledgeMessage message = new SessionIndividualAcknowledgeMessage(consumerID,
+ messageID,
+ blockOnAcknowledge);
+
+ if (blockOnAcknowledge)
+ {
+ channel.sendBlocking(message);
+ }
+ else
+ {
+ channel.sendBatched(message);
+ }
+ }
+
public void expire(final long consumerID, final long messageID) throws HornetQException
{
checkClosed();
@@ -719,7 +744,7 @@
// We don't send expiries for pre-ack since message will already have been acked on server
if (!preAcknowledge)
{
- SessionExpiredMessage message = new SessionExpiredMessage(consumerID, messageID);
+ SessionExpireMessage message = new SessionExpireMessage(consumerID, messageID);
channel.send(message);
}
@@ -851,9 +876,9 @@
backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
remotingConnection = backupConnection;
-
+
int lcid = channel.getLastConfirmedCommandID();
-
+
Packet request = new ReattachSessionMessage(name, lcid);
Channel channel1 = backupConnection.getChannel(1, -1);
@@ -864,11 +889,11 @@
{
// The session was found on the server - we reattached transparently ok
- channel.replayCommands(response.getLastConfirmedCommandID());
+ channel.replayCommands(response.getLastConfirmedCommandID());
}
else
{
-
+
// The session wasn't found on the server - probably we're failing over onto a backup server where the
// session won't exist or the target server has been restarted - in this case the session will need to be
// recreated,
@@ -892,7 +917,8 @@
autoCommitAcks,
preAcknowledge,
confirmationWindowSize,
- defaultAddress == null ? null : defaultAddress.toString());
+ defaultAddress == null ? null
+ : defaultAddress.toString());
boolean retry = false;
do
{
@@ -924,7 +950,7 @@
for (Map.Entry<Long, ClientConsumerInternal> entry : consumers.entrySet())
{
SessionQueueQueryResponseMessage queueInfo = entry.getValue().getQueueInfo();
-
+
// We try and recreate any non durable queues, since they probably won't be there unless
// they are defined in hornetq-configuration.xml
// This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover
@@ -950,7 +976,7 @@
false);
sendPacketWithoutLock(createConsumerRequest);
-
+
int clientWindowSize = entry.getValue().getClientWindowSize();
if (clientWindowSize != 0)
@@ -995,7 +1021,7 @@
channel.returnBlocking();
}
- channel.setTransferring(false);
+ channel.setTransferring(false);
}
catch (Throwable t)
{
@@ -1014,15 +1040,15 @@
// not having any credits to send
}
}
-
+
private volatile SimpleString defaultAddress;
-
+
public void setAddress(final Message message, final SimpleString address)
{
if (defaultAddress == null)
{
defaultAddress = address;
-
+
message.setAddress(address);
}
else
@@ -1037,9 +1063,7 @@
}
}
}
-
-
-
+
public void setPacketSize(final int packetSize)
{
if (packetSize > this.initialMessagePacketSize)
@@ -1083,7 +1107,7 @@
{
return producerCreditManager.getCredits(address, anon);
}
-
+
public void returnCredits(final SimpleString address)
{
producerCreditManager.returnCredits(address);
@@ -1093,7 +1117,7 @@
{
producerCreditManager.receiveCredits(address, credits);
}
-
+
public ClientProducerCreditManager getProducerCreditManager()
{
return producerCreditManager;
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -33,6 +33,8 @@
String getName();
void acknowledge(long consumerID, long messageID) throws HornetQException;
+
+ void individualAcknowledge(long consumerID, long messageID) throws HornetQException;
boolean isCacheLargeMessageClient();
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -103,6 +103,11 @@
{
session.acknowledge(consumerID, messageID);
}
+
+ public void individualAcknowledge(final long consumerID, final long messageID) throws HornetQException
+ {
+ session.individualAcknowledge(consumerID, messageID);
+ }
public void addConsumer(final ClientConsumerInternal consumer)
{
Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -24,6 +24,7 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_EXPIRED;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_SEND;
@@ -67,8 +68,9 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionExpireMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
@@ -281,7 +283,7 @@
}
case SESS_EXPIRED:
{
- SessionExpiredMessage message = (SessionExpiredMessage)packet;
+ SessionExpireMessage message = (SessionExpireMessage)packet;
session.expire(message.getConsumerID(), message.getMessageID());
break;
}
@@ -414,6 +416,17 @@
closeChannel = true;
break;
}
+ case SESS_INDIVIDUAL_ACKNOWLEDGE:
+ {
+ SessionIndividualAcknowledgeMessage message = (SessionIndividualAcknowledgeMessage)packet;
+ requiresResponse = message.isRequiresResponse();
+ session.individualAcknowledge(message.getConsumerID(), message.getMessageID());
+ if (requiresResponse)
+ {
+ response = new NullResponseMessage();
+ }
+ break;
+ }
case SESS_CONSUMER_CLOSE:
{
requiresResponse = true;
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -188,13 +188,15 @@
{
resendCache.add(packet);
}
-
- connection.getTransportConnection().write(buffer, flush, batch);
}
finally
{
lock.unlock();
}
+
+ //The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
+ //buffer is full, preventing any incoming buffers being handled and blocking failover
+ connection.getTransportConnection().write(buffer, flush, batch);
}
}
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -48,6 +48,7 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_EXPIRED;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_CREDITS;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY;
@@ -114,8 +115,9 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionExpireMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
@@ -216,7 +218,7 @@
}
case SESS_EXPIRED:
{
- packet = new SessionExpiredMessage();
+ packet = new SessionExpireMessage();
break;
}
case SESS_COMMIT:
@@ -379,6 +381,11 @@
packet = new SessionConsumerCloseMessage();
break;
}
+ case SESS_INDIVIDUAL_ACKNOWLEDGE:
+ {
+ packet = new SessionIndividualAcknowledgeMessage();
+ break;
+ }
case NULL_RESPONSE:
{
packet = new NullResponseMessage();
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -149,6 +149,8 @@
public static final byte SESS_PRODUCER_REQUEST_CREDITS = 79;
public static final byte SESS_PRODUCER_CREDITS = 80;
+
+ public static final byte SESS_INDIVIDUAL_ACKNOWLEDGE = 81;
// Replication
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -29,7 +29,7 @@
private long consumerID;
private long messageID;
-
+
private boolean requiresResponse;
// Static --------------------------------------------------------
Copied: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpireMessage.java (from rev 9052, trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpiredMessage.java)
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpireMessage.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpireMessage.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @version <tt>$Revision$</tt>
+ */
+public class SessionExpireMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long consumerID;
+
+ private long messageID;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionExpireMessage(final long consumerID, final long messageID)
+ {
+ super(PacketImpl.SESS_EXPIRED);
+
+ this.consumerID = consumerID;
+
+ this.messageID = messageID;
+ }
+
+ public SessionExpireMessage()
+ {
+ super(PacketImpl.SESS_EXPIRED);
+ }
+
+ // Public --------------------------------------------------------
+
+ public long getConsumerID()
+ {
+ return consumerID;
+ }
+
+ public long getMessageID()
+ {
+ return messageID;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(consumerID);
+
+ buffer.writeLong(messageID);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ consumerID = buffer.readLong();
+
+ messageID = buffer.readLong();
+ }
+
+ @Override
+ public boolean equals(final Object other)
+ {
+ if (other instanceof SessionExpireMessage == false)
+ {
+ return false;
+ }
+
+ SessionExpireMessage r = (SessionExpireMessage)other;
+
+ return super.equals(other) && consumerID == r.consumerID && messageID == r.messageID;
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Deleted: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpiredMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpiredMessage.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpiredMessage.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -1,98 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- */
-public class SessionExpiredMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private long consumerID;
-
- private long messageID;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionExpiredMessage(final long consumerID, final long messageID)
- {
- super(PacketImpl.SESS_EXPIRED);
-
- this.consumerID = consumerID;
-
- this.messageID = messageID;
- }
-
- public SessionExpiredMessage()
- {
- super(PacketImpl.SESS_EXPIRED);
- }
-
- // Public --------------------------------------------------------
-
- public long getConsumerID()
- {
- return consumerID;
- }
-
- public long getMessageID()
- {
- return messageID;
- }
-
- @Override
- public void encodeRest(final HornetQBuffer buffer)
- {
- buffer.writeLong(consumerID);
-
- buffer.writeLong(messageID);
- }
-
- @Override
- public void decodeRest(final HornetQBuffer buffer)
- {
- consumerID = buffer.readLong();
-
- messageID = buffer.readLong();
- }
-
- @Override
- public boolean equals(final Object other)
- {
- if (other instanceof SessionExpiredMessage == false)
- {
- return false;
- }
-
- SessionExpiredMessage r = (SessionExpiredMessage)other;
-
- return super.equals(other) && consumerID == r.consumerID && messageID == r.messageID;
- }
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Added: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @version <tt>$Revision$</tt>
+ */
+public class SessionIndividualAcknowledgeMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long consumerID;
+
+ private long messageID;
+
+ private boolean requiresResponse;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionIndividualAcknowledgeMessage(final long consumerID, final long messageID, final boolean requiresResponse)
+ {
+ super(PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE);
+
+ this.consumerID = consumerID;
+
+ this.messageID = messageID;
+
+ this.requiresResponse = requiresResponse;
+ }
+
+ public SessionIndividualAcknowledgeMessage()
+ {
+ super(PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE);
+ }
+
+ // Public --------------------------------------------------------
+
+ public long getConsumerID()
+ {
+ return consumerID;
+ }
+
+ public long getMessageID()
+ {
+ return messageID;
+ }
+
+ public boolean isRequiresResponse()
+ {
+ return requiresResponse;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(consumerID);
+
+ buffer.writeLong(messageID);
+
+ buffer.writeBoolean(requiresResponse);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ consumerID = buffer.readLong();
+
+ messageID = buffer.readLong();
+
+ requiresResponse = buffer.readBoolean();
+ }
+
+ @Override
+ public boolean equals(final Object other)
+ {
+ if (other instanceof SessionIndividualAcknowledgeMessage == false)
+ {
+ return false;
+ }
+
+ SessionIndividualAcknowledgeMessage r = (SessionIndividualAcknowledgeMessage)other;
+
+ return super.equals(other) && consumerID == r.consumerID &&
+ messageID == r.messageID &&
+ requiresResponse == r.requiresResponse;
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -38,9 +38,11 @@
Queue getQueue();
- MessageReference getExpired(long messageID) throws Exception;
+ MessageReference removeReferenceByID(long messageID) throws Exception;
void acknowledge(boolean autoCommitAcks, Transaction tx, long messageID) throws Exception;
+
+ void individualAcknowledge(boolean autoCommitAcks, Transaction tx, long messageID) throws Exception;
void forceDelivery(long sequence);
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -18,7 +18,6 @@
import javax.transaction.xa.Xid;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.spi.core.protocol.SessionCallback;
/**
*
@@ -43,6 +42,8 @@
void removeConsumer(long consumerID) throws Exception;
void acknowledge(long consumerID, long messageID) throws Exception;
+
+ void individualAcknowledge(long consumerID, long messageID) throws Exception;
void expire(long consumerID, long messageID) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -517,8 +517,32 @@
}
while (ref.getMessage().getMessageID() != messageID);
}
+
+ public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
+ {
+ if (browseOnly)
+ {
+ return;
+ }
+
+ MessageReference ref = removeReferenceByID(messageID);
+
+ if (ref == null)
+ {
+ throw new IllegalStateException("Cannot find ref to ack " + messageID);
+ }
+
+ if (autoCommitAcks)
+ {
+ ref.getQueue().acknowledge(ref);
+ }
+ else
+ {
+ ref.getQueue().acknowledge(tx, ref);
+ }
+ }
- public MessageReference getExpired(final long messageID) throws Exception
+ public MessageReference removeReferenceByID(final long messageID) throws Exception
{
if (browseOnly)
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -473,10 +473,22 @@
consumer.acknowledge(autoCommitAcks, tx, messageID);
}
+
+ public void individualAcknowledge(final long consumerID, final long messageID) throws Exception
+ {
+ ServerConsumer consumer = consumers.get(consumerID);
+
+ if (this.xa && tx == null)
+ {
+ throw new HornetQXAException(XAException.XAER_PROTO, "Invalid transaction state");
+ }
+ consumer.individualAcknowledge(autoCommitAcks, tx, messageID);
+ }
+
public void expire(final long consumerID, final long messageID) throws Exception
{
- MessageReference ref = consumers.get(consumerID).getExpired(messageID);
+ MessageReference ref = consumers.get(consumerID).removeReferenceByID(messageID);
if (ref != null)
{
Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -201,7 +201,68 @@
session.deleteQueue(queue);
}
+
+ // https://jira.jboss.org/jira/browse/HORNETQ-275
+ public void testOutOfOrderAcknowledgement() throws Exception
+ {
+ SimpleString queue = RandomUtil.randomSimpleString();
+ SimpleString address = RandomUtil.randomSimpleString();
+ session.createQueue(address, queue, false);
+
+ ClientProducer producer = session.createProducer(address);
+
+ ClientConsumer consumer = session.createConsumer(queue);
+
+ session.start();
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage m = createTextMessage(Integer.toString(i), session);
+ m.setPriority((byte)i);
+ producer.send(m);
+
+ Thread.sleep(20);
+ }
+
+ // Now we wait a little bit to make sure the messages are in the client side buffer
+
+ // They should have been added to the delivering list in the ServerConsumerImpl in the order
+ // they were sent, not priority order
+
+ //We receive one of the messages
+ ClientMessage m = consumer.receive(500);
+ Assert.assertNotNull(m);
+ Assert.assertEquals(9, m.getPriority());
+
+ //Ack it
+ m.acknowledge();
+
+ consumer.close();
+
+ //Close and try and receive the other ones
+
+ consumer = session.createConsumer(queue);
+
+ // Other messages should be received now
+ // Previously there was a bug whereby if deliveries were stored on server side in send order
+ // then if received in priority order, and acked
+ // the ack would ack all messages up to the one received - resulting in acking
+ // messages that hadn't been delivered yet
+ for (int i = 8; i >= 0; i--)
+ {
+ m = consumer.receive(500);
+ Assert.assertNotNull(m);
+ Assert.assertEquals(i, m.getPriority());
+
+ m.acknowledge();
+ }
+
+ consumer.close();
+
+ session.deleteQueue(queue);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -51,6 +51,7 @@
final ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) HornetQClient.createClientSessionFactory(new TransportConfiguration("org.hornetq.integration.transports.netty.NettyConnectorFactory"));
sf.setReconnectAttempts(-1);
sf.setConfirmationWindowSize(1024 * 1024);
+ sf.setAckBatchSize(0);
return sf;
}
Modified: trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -881,10 +881,10 @@
protected ClientMessage createTextMessage(final String s, final boolean durable, final ClientSession clientSession)
{
ClientMessage message = clientSession.createMessage(HornetQTextMessage.TYPE,
- durable,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ durable,
+ 0,
+ System.currentTimeMillis(),
+ (byte)4);
message.getBodyBuffer().writeString(s);
return message;
}
14 years, 1 month
JBoss hornetq SVN: r9065 - branches/HnetQ_323_cn/docs/user-manual/zh.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-04-07 04:35:35 -0400 (Wed, 07 Apr 2010)
New Revision: 9065
Modified:
branches/HnetQ_323_cn/docs/user-manual/zh/send-guarantees.xml
Log:
one more
Modified: branches/HnetQ_323_cn/docs/user-manual/zh/send-guarantees.xml
===================================================================
--- branches/HnetQ_323_cn/docs/user-manual/zh/send-guarantees.xml 2010-04-06 23:24:37 UTC (rev 9064)
+++ branches/HnetQ_323_cn/docs/user-manual/zh/send-guarantees.xml 2010-04-07 08:35:35 UTC (rev 9065)
@@ -17,132 +17,86 @@
<!-- permitted by applicable law. -->
<!-- ============================================================================= -->
<chapter id="send-guarantees">
- <title>Guarantees of sends and commits</title>
+ <title>发送与提交的保证</title>
<section>
- <title>Guarantees of Transaction Completion</title>
- <para>When committing or rolling back a transaction with HornetQ, the request to commit or
- rollback is sent to the server, and the call will block on the client side until a
- response has been received from the server that the commit or rollback was
- executed.</para>
- <para>When the commit or rollback is received on the server, it will be committed to the
- journal, and depending on the value of the parameter <literal
- >journal-sync-transactional</literal> the server will ensure that the commit or
- rollback is durably persisted to storage before sending the response back to the client.
- If this parameter has the value <literal>false</literal> then commit or rollback may not
- actually get persisted to storage until some time after the response has been sent to
- the client. In event of server failure this may mean the commit or rollback never gets
- persisted to storage. The default value of this parameter is <literal>true</literal> so
- the client can be sure all transaction commits or rollbacks have been persisted to
- storage by the time the call to commit or rollback returns.</para>
- <para>Setting this parameter to <literal>false</literal> can improve performance at the
- expense of some loss of transaction durability.</para>
- <para>This parameter is set in <literal>hornetq-configuration.xml</literal></para>
+ <title>事务保证</title>
+ <para>在提交或回滚事务时,HornetQ将提交或回滚的请求发送到服务器,客户端阻塞等待服务器的响应。</para>
+ <para>当服务器端收到提交或回滚的请求时,它将事务信息记录到日志(journal)中。然后向客户端发回
+ 响应。参数<literal>journal-sync-transactional</literal>控制着如何向客户端发回响应。
+ 如果它的值是<literal>false</literal>,服务器向客户端发回响应时事务的处理結果不一定已经被
+ 保存到磁盘中。可能会在之后的某个时间保存。如果期间服务器发生故障那么事务的处理信息可能丢失。
+ 当它的值是<literal>true</literal>时,服务器将保证在向客户端发回响应时,事务的处理信息
+ 已经被保存到了磁盘中。默认值是<literal>true</literal>。</para>
+ <para>显然将这个参数设为<literal>false</literal>可以提高性能,但是要以牺牲事务的持久性为代价。</para>
+ <para>这个参数在 <literal>hornetq-configuration.xml</literal>文件中。</para>
</section>
<section id="non-transactional-sends">
- <title>Guarantees of Non Transactional Message Sends</title>
- <para>If you are sending messages to a server using a non transacted session, HornetQ can be
- configured to block the call to send until the message has definitely reached the
- server, and a response has been sent back to the client. This can be configured
- individually for durable and non-durable messages, and is determined by the
- following two parameters:</para>
+ <title>非事务性消息发送的保证</title>
+ <para>使用非事务性会话发送消息时,经过适当配置HornetQ,客户端在发送后以阻塞的方式等待,直到确认发出
+ 的消息已经到达服务器后再返回。可以对持久化或非持久化的消息分别配置,具体参数如下:</para>
<itemizedlist>
<listitem>
- <para><literal>BlockOnDurableSend</literal>. If this is set to <literal
- >true</literal> then all calls to send for durable messages on non
- transacted sessions will block until the message has reached the server, and a
- response has been sent back. The default value is <literal>true</literal>.
+ <para><literal>BlockOnDurableSend</literal>。如果设为<literal>true</literal>则通过
+ 非事务性会话发送持久消息时,每次发送都将阻塞直到消息到达服务器并返回通知为止。默认值是
+ <literal>true</literal>。
</para>
</listitem>
<listitem>
- <para><literal>BlockOnNonDurableSend</literal>. If this is set to <literal
- >true</literal> then all calls to send for non-durable messages on non
- transacted sessions will block until the message has reached the server, and a
- response has been sent back. The default value is <literal
- >false</literal>.</para>
+ <para><literal>BlockOnNonDurableSend</literal>。如果设为<literal>true</literal>,
+ 则通过非事务性会话发送非持久消息时,每次发送都将阻塞直到消息到达服务器并返回通知为止。默认值是
+ <literal>false</literal>。</para>
</listitem>
</itemizedlist>
- <para>Setting block on sends to <literal>true</literal> can reduce performance since each
- send requires a network round trip before the next send can be performed. This means the
- performance of sending messages will be limited by the network round trip time (RTT) of
- your network, rather than the bandwidth of your network. For better performance we
- recommend either batching many messages sends together in a transaction since with a
- transactional session, only the commit / rollback blocks not every send, or, using
- HornetQ's advanced <emphasis>asynchronous send acknowledgements feature</emphasis>
- described in <xref linkend="asynchronous-send-acknowledgements"/>.</para>
- <para>If you are using JMS and you're using the JMS service on the server to load your JMS
- connection factory instances into JNDI then these parameters can be configured in
- <literal>hornetq-jms.xml</literal> using the elements <literal
- >block-on-durable-send</literal> and <literal
- >block-on-non-durable-send</literal>. If you're using JMS but not using JNDI then
- you can set these values directly on the <literal>HornetQConnectionFactory</literal>
- instance using the appropriate setter methods.</para>
- <para>If you're using core you can set these values directly on the <literal
- >ClientSessionFactory</literal> instance using the appropriate setter
- methods.</para>
- <para>When the server receives a message sent from a non transactional session, and that
- message is durable and the message is routed to at least one durable queue, then the
- server will persist the message in permanent storage. If the journal parameter <literal
- >journal-sync-non-transactional</literal> is set to <literal>true</literal> the
- server will not send a response back to the client until the message has been persisted
- and the server has a guarantee that the data has been persisted to disk. The default
- value for this parameter is <literal>true</literal>.</para>
+ <para>将发送设置为阻塞方式会降低程序的效率。因为每次发送都需要一次网络往返的过程,然后才可以进行下次发送。
+ 这样发送消息的速度将受网络往返时间(RTT)的限制。这样你的网络带宽就可能没有被充分利用。为了提高效率,我们
+ 建议采用事务来批量发送消息。因为在事务中,只有在提交或回滚时阻塞。另外你还可以利用HornetQ高级的
+ <emphasis>异步发送通知功能</emphasis>。这一功能在<xref linkend="asynchronous-send-acknowledgements"/>
+ 进行了描述。</para>
+ <para>使用JMS时,如果JMS的连接工厂是在服务器端被注册到JNDI服务,你需要配置
+ <literal>hornetq-jms.xml</literal>文件中的<literal>block-on-durable-send</literal>
+ 和<literal>block-on-non-durable-send</literal>。如果不使用JNDI,可以调用
+ <literal>HornetQConnectionFactory</literal>相应的设置方法进行配置。</para>
+ <para>如果你使用的是内核服务,你可以直接在<literal
+ >ClientSessionFactory</literal>上用相关的方法设置相应的参数。</para>
+ <para>当服务器从一个非事务性的会话收到一个消息时,如果这个消息是持久的并且此消息被路由到至少一个持久的队列中,
+ 则该消息会被持久化到永久存贮介质中。如果日志(journal)的参数<literal
+ >journal-sync-non-transactional</literal>设为<literal>true</literal>,服务器在向客户
+ 发送响应时,它能保证消息已经被持久化到磁盘中。默认值是<literal>true</literal>。</para>
</section>
<section id="send-guarantees.nontrans.acks">
- <title>Guarantees of Non Transactional Acknowledgements</title>
- <para>If you are acknowledging the delivery of a message at the client side using a non
- transacted session, HornetQ can be configured to block the call to acknowledge until the
- acknowledge has definitely reached the server, and a response has been sent back to the
- client. This is configured with the parameter <literal>BlockOnAcknowledge</literal>. If
- this is set to <literal>true</literal> then all calls to acknowledge on non transacted
- sessions will block until the acknowledge has reached the server, and a response has
- been sent back. You might want to set this to <literal>true</literal> if you want to
- implement a strict <emphasis>at most once</emphasis> delivery policy. The default value
- is <literal>false</literal></para>
+ <title>非事务性通知的保证</title>
+ <para>当客户端使用非事务性会话向服务器通知消息收到时,可以配置HornetQ使得客户端的通知阻塞直到服务器收到
+ 了通知并返回为止。其相应的配置参数是<literal>BlockOnAcknowledge</literal>。如果该参数设为
+ <literal>true</literal>则所有的通过非事务会话的消息通知都是阻塞式的。如果你想要的消息传递策略是
+ <emphasis>最多一次</emphasis>的话,那么你需要将此参数设为。默认值是<literal>false</literal>。</para>
</section>
<section id="asynchronous-send-acknowledgements">
- <title>Asynchronous Send Acknowledgements</title>
- <para>If you are using a non transacted session but want a guarantee that every message sent
- to the server has reached it, then, as discussed in <xref
- linkend="non-transactional-sends"/>, you can configure HornetQ to block the call to
- send until the server has received the message, persisted it and sent back a response.
- This works well but has a severe performance penalty - each call to send needs to block
- for at least the time of a network round trip (RTT) - the performance of sending is thus
- limited by the latency of the network, <emphasis>not</emphasis> limited by the network
- bandwidth.</para>
- <para>Let's do a little bit of maths to see how severe that is. We'll consider a standard
- 1Gib ethernet network with a network round trip between the server and the client of
- 0.25 ms.</para>
- <para>With a RTT of 0.25 ms, the client can send <emphasis>at most</emphasis> 1000/ 0.25 =
- 4000 messages per second if it blocks on each message send.</para>
- <para>If each message is < 1500 bytes and a standard 1500 bytes MTU size is used on the
- network, then a 1GiB network has a <emphasis>theoretical</emphasis> upper limit of (1024
- * 1024 * 1024 / 8) / 1500 = 89478 messages per second if messages are sent without
- blocking! These figures aren't an exact science but you can clearly see that being
- limited by network RTT can have serious effect on performance.</para>
- <para>To remedy this, HornetQ provides an advanced new feature called <emphasis>asynchronous
- send acknowledgements</emphasis>. With this feature, HornetQ can be configured to
- send messages without blocking in one direction and asynchronously getting
- acknowledgement from the server that the messages were received in a separate stream. By
- de-coupling the send from the acknowledgement of the send, the system is not limited by
- the network RTT, but is limited by the network bandwidth. Consequently better throughput
- can be achieved than is possible using a blocking approach, while at the same time
- having absolute guarantees that messages have successfully reached the server.</para>
- <para>The window size for send acknowledgements is determined by the confirmation-window-size parameter on
- the connection factory or client session factory. Please see <xref linkend="client-reconnection"/> for more info on this.</para>
+ <title>异步发送通知</title>
+ <para>如果你使用的是非事务会话来发送消息,并且希望保证每个发送出去的消息都到达服务器的话,你可以将HornetQ配置
+ 成阻塞的方式,如<xref linkend="non-transactional-sends"/>讨论的那样。这样做的一个缺点是性能的降低。
+ 因为这样每发送一个消息就需要一次网络的往返通信。如果网络时延越长,消息发送的效率就越低。同时网络的带宽对消息
+ 的发送没有影响。</para>
+ <para>我们来做一个简单的计算。假设有一个1Gib的网络,客户端与服务器间往返时间为0.25ms。</para>
+ <para>这样,在阻塞方式的情况下,客户端<emphasis>最大</emphasis>的消息发送速度为 1000/ 0.25 =
+ 4000 消息每秒。</para>
+ <para>如果每个消息的大小< 1500字节,而且网络的最大传输单元(MTU)是1500字节。那么理论上1GiB的网络
+ 最大的传输速率是 (1024 * 1024 * 1024 / 8) / 1500 = 89478 消息每秒!尽管这不是一个精确的工程计算但
+ 你可以看出阻塞式的发送对性能的影响会有多大。</para>
+ <para>为了解决这个问题,HornetQ提供了一种新的功能,称为<emphasis>异步发送通知</emphasis>。
+ 它允许消息以非阻塞的方式发送,同时从另一个连接流中异步地接收服务器的通知。这样就使得消息的发送与通知分开来,
+ 避免了阻塞方式带来的缺点。在保证消息可行发送到服务器的同时提高了呑吐量。</para>
+ <para>参数用来定义消息发送通知的窗口大小。它属于连接工厂或客户会话工厂。参见<xref linkend="client-reconnection"/>
+ 以获取更多的相关信息。</para>
<section>
- <title>Asynchronous Send Acknowledgements</title>
- <para>To use the feature using the core API, you implement the interface <literal
- >org.hornetq.api.core.client.SendAcknowledgementHandler</literal> and set a handler
- instance on your <literal>ClientSession</literal>.</para>
- <para>Then, you just send messages as normal using your <literal
- >ClientSession</literal>, and as messages reach the server, the server will send
- back an acknowledgement of the send asynchronously, and some time later you are
- informed at the client side by HornetQ calling your handler's <literal
- >sendAcknowledged(ClientMessage message)</literal> method, passing in a
- reference to the message that was sent.</para>
- <para>To enable asynchronous send acknowledgements you must make sure <literal>confirmation-window-size</literal> is set to a positive integer value, e.g. 10MiB</para>
- <para>Please see <xref linkend="asynchronous-send-acknowledgements-example"/> for a full
- working example.</para>
+ <title>异步发送通知</title>
+ <para>如果使用核心API,你需要实现<literal
+ >org.hornetq.api.core.client.SendAcknowledgementHandler</literal>接口并将一个实例设置到
+ <literal>ClientSession</literal>中。</para>
+ <para>然后使用这个<literal>ClientSession</literal>发送消息。当消息到达服务器后,服务器向客户端异步地发送通知,
+ 并在客户端调用你的SendAcknowledgementHandler实例的<literal
+ >sendAcknowledged(ClientMessage message)</literal>方法。其中传入的参数就是发送的消息的引用。</para>
+ <para>为了使异步发送通知正常工作你必须确保<literal>confirmation-window-size</literal>的值为一个正整数,例如 10MiB</para>
+ <para>相关的例子请参见 <xref linkend="asynchronous-send-acknowledgements-example"/>。</para>
</section>
</section>
</chapter>
14 years, 1 month
JBoss hornetq SVN: r9064 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-04-06 19:24:37 -0400 (Tue, 06 Apr 2010)
New Revision: 9064
Modified:
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
HORNETQ-353 - fixing paging and the stress testsuite
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-04-06 15:31:44 UTC (rev 9063)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-04-06 23:24:37 UTC (rev 9064)
@@ -832,6 +832,13 @@
}
PagedMessage pagedMessage;
+
+ if (!message.isDurable())
+ {
+ // The address should never be transient when paging (even for non-persistent messages when paging)
+ // This will force everything to be persisted
+ message.bodyChanged();
+ }
if (transactionID != -1)
{
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-04-06 15:31:44 UTC (rev 9063)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-04-06 23:24:37 UTC (rev 9064)
@@ -81,8 +81,19 @@
// Public --------------------------------------------------------
- public void testSendReceivePaging() throws Exception
+ public void testSendReceivePagingPersistent() throws Exception
{
+ internaltestSendReceivePaging(true);
+ }
+
+
+ public void testSendReceivePagingNonPersistent() throws Exception
+ {
+ internaltestSendReceivePaging(false);
+ }
+
+ private void internaltestSendReceivePaging(final boolean persistentMessages) throws Exception
+ {
clearData();
Configuration config = createDefaultConfig();
@@ -126,7 +137,7 @@
for (int i = 0; i < numberOfMessages; i++)
{
- message = session.createMessage(true);
+ message = session.createMessage(persistentMessages);
HornetQBuffer bodyLocal = message.getBodyBuffer();
@@ -140,15 +151,18 @@
session.close();
- server.stop();
+ if (persistentMessages)
+ {
+ server.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+ }
- server = createServer(true,
- config,
- PagingTest.PAGE_SIZE,
- PagingTest.PAGE_MAX,
- new HashMap<String, AddressSettings>());
- server.start();
-
sf = createInVMFactory();
session = sf.createSession(null, null, false, true, true, false, 0);
14 years, 1 month
JBoss hornetq SVN: r9063 - branches/HnetQ_323_cn/docs/user-manual/zh.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-04-06 11:31:44 -0400 (Tue, 06 Apr 2010)
New Revision: 9063
Modified:
branches/HnetQ_323_cn/docs/user-manual/zh/flow-control.xml
branches/HnetQ_323_cn/docs/user-manual/zh/transaction-config.xml
Log:
done
Modified: branches/HnetQ_323_cn/docs/user-manual/zh/flow-control.xml
===================================================================
--- branches/HnetQ_323_cn/docs/user-manual/zh/flow-control.xml 2010-04-06 15:30:28 UTC (rev 9062)
+++ branches/HnetQ_323_cn/docs/user-manual/zh/flow-control.xml 2010-04-06 15:31:44 UTC (rev 9063)
@@ -17,98 +17,73 @@
<!-- permitted by applicable law. -->
<!-- ============================================================================= -->
<chapter id="flow-control">
- <title>Flow Control</title>
- <para>Flow control is used to limit the flow of data between a client and server, or a server and
- another server in order to prevent the client or server being overwhelmed with data.</para>
+ <title>流控制</title>
+ <para>流控制是指对客户端与服务器之间,或者服务器之间的数据流量进行限制,目的是防止通迅双方由于大量数据而过载。</para>
<section>
- <title>Consumer Flow Control</title>
- <para>This controls the flow of data between the server and the client as the client consumes
- messages. For performance reasons clients normally buffer messages before delivering to the
- consumer via the <literal>receive()</literal> method or asynchronously via a message
- listener. If the consumer cannot process messages as fast as they are being delivered and
- stored in the internal buffer, then you could end up with a situation where messages would
- keep building up possibly causing out of memory on the client if they cannot be processed
- in time.</para>
+ <title>接收者(consumer)流控制</title>
+ <para>这是指对客户端的接收者接收消息流的控制。通常为了提高效率,在客户端通常将消息放入缓存,然后再将缓存中
+ 的消息传递给接收者(consumer)。当接收者处理消息的速度小于服务器向其发送消息的速度时,就可能造成消息在
+ 客户端不断积累,最終引起内存溢出的错误。</para>
<section id="flow-control.consumer.window">
- <title>Window-Based Flow Control</title>
- <para>By default, HornetQ consumers buffer messages from the server in a client side buffer
- before the client consumes them. This improves performance: otherwise every time the
- client consumes a message, HornetQ would have to go the server to request the next
- message. In turn, this message would then get sent to the client side, if one was
- available.</para>
- <para>A network round trip would be involved for <emphasis>every</emphasis> message and
- considerably reduce performance.</para>
- <para>To prevent this, HornetQ pre-fetches messages into a buffer on each consumer. The
- total maximum size of messages (in bytes) that will be buffered on each consumer is
- determined by the <literal>consumer-window-size</literal> parameter.</para>
- <para>By default, the <literal>consumer-window-size</literal> is set to 1 MiB (1024 * 1024
- bytes).</para>
- <para>The value can be:</para>
+ <title>基于窗口的流控制</title>
+ <para>默认情况下HornetQ的接收者一端会将消息进行缓存以提高性能。如果不这样做,那每次接收者收到一个消息,
+ 都得通知服务器传递下一个消息,然后服务器再将下一个消息传递过来。这就增加了通信的次数。</para>
+ <para>对于每一次消息传递都有一个网络的往返通信,这样降低了性能。</para>
+ <para>为了避免这样,HornetQ将每个接收者的消息提前接收到一处缓存中。每个缓存的最大值由
+ <literal>consumer-window-size</literal>参数决定(单位字节)。</para>
+ <para><literal>consumer-window-size</literal>的默认值是 1 MiB (1024 * 1024
+ 字节)。</para>
+ <para>它的值可以是:</para>
<itemizedlist>
<listitem>
- <para><literal>-1</literal> for an <emphasis>unbounded</emphasis> buffer</para>
+ <para><literal>-1</literal> 代表<emphasis>大小无限制</emphasis>的缓存。</para>
</listitem>
<listitem>
- <para><literal>0</literal> to not buffer any messages. See <xref
- linkend="examples.no-consumer-buffering"/> for working example of a consumer
- with no buffering.</para>
+ <para><literal>0</literal> 代表不缓存消息。参见相关的例子 <xref
+ linkend="examples.no-consumer-buffering"/>。</para>
</listitem>
<listitem>
- <para><literal>>0</literal> for a buffer with the given maximum size in
- bytes.</para>
+ <para><literal>>0</literal> 代表缓存的最大字节数。</para>
</listitem>
</itemizedlist>
- <para>Setting the consumer window size can considerably improve performance depending on
- the messaging use case. As an example, let's consider the two extremes: </para>
+ <para>合理设置接收者的窗口大小可以显著提高性能。下面是两个极端的例子:</para>
<variablelist>
<varlistentry>
- <term>Fast consumers</term>
+ <term>快速接收者</term>
<listitem>
- <para>Fast consumers can process messages as fast as they consume them (or even
- faster)</para>
- <para>To allow fast consumers, set the <literal>consumer-window-size</literal> to
- -1. This will allow <emphasis>unbounded</emphasis> message buffering on the
- client side.</para>
- <para>Use this setting with caution: it can overflow the client memory if the
- consumer is not able to process messages as fast as it receives them.</para>
+ <para>所谓快速接收者是指消息的接收者处理消息的速度大于等于它的接收速度。</para>
+ <para>对于快速接收者或以将<literal>consumer-window-size</literal>设为
+ -1,使得客户端的消息缓存的大小 <emphasis>无限制</emphasis>。</para>
+ <para>请谨慎使用这一设置值: 如果接收者的消息处理速度比接收速度小,可造成客户端内存溢出。</para>
</listitem>
</varlistentry>
<varlistentry>
- <term>Slow consumers</term>
+ <term>慢接收者</term>
<listitem>
- <para>Slow consumers takes significant time to process each message and it is
- desirable to prevent buffering messages on the client side so that they can be
- delivered to another consumer instead.</para>
- <para>Consider a situation where a queue has 2 consumers; 1 of which is very slow.
- Messages are delivered in a round robin fashion to both consumers, the fast
- consumer processes all of its messages very quickly until its buffer is empty.
- At this point there are still messages awaiting to be processed in the buffer
- of the slow consumer thus preventing them being processed by the fast consumer.
- The fast consumer is therefore sitting idle when it could be processing the
- other messages. </para>
- <para>To allow slow consumers, set the <literal>consumer-window-size</literal> to
- 0 (for no buffer at all). This will prevent the slow consumer from buffering
- any messages on the client side. Messages will remain on the server side ready
- to be consumed by other consumers.</para>
- <para>Setting this to 0 can give deterministic distribution between multiple
- consumers on a queue.</para>
+ <para>所谓慢接收者是指接收者每处理一个消息就要花很多时间。这样将缓存关闭就比较合理。服务器可以将多余的
+ 消息传递给其它的接收者。</para>
+ <para>假设一个队列有2个接收者。其中一个接收者非常慢。消息被轮流传递到两个接收者。其中的快速接收者
+ 很快将其缓存中的消息处理完毕。同时慢接收者的缓存中还有一些消息等待处理。这样快速接收者在一段时间
+ 内就处于空闲状态。</para>
+ <para>这时,将<literal>consumer-window-size</literal> 设为0 (没有缓存),就可以将它变成
+ 慢接收者。这样在慢接收者一方不会缓存消息,这使得快的接收者可以处理更多的消息,而不至于处于空闲
+ 状态。</para>
+ <para>这说明将它设置为0可以控制一个队列的消息在多个接收者之间的消息分配。</para>
</listitem>
</varlistentry>
</variablelist>
- <para>Most of the consumers cannot be clearly identified as fast or slow consumers but are
- in-between. In that case, setting the value of <literal>consumer-window-size</literal>
- to optimize performance depends on the messaging use case and requires benchmarks to
- find the optimal value, but a value of 1MiB is fine in most cases.</para>
+ <para>大多数情况下很难判断哪些接收者是快速的,哪些是慢速的。往往很多接收者是处于两者之间。这样对于
+ <literal>consumer-window-size</literal>的值就要视具体情况而定。有时需要进行一定的测试
+ 来决定它的最佳值。通常情况下将其设为1MiB可以满足大多数的应用情况。</para>
<section id="flow-control.core.api">
- <title>Using Core API</title>
- <para>If HornetQ Core API is used, the consumer window size is specified by <literal
- >ClientSessionFactory.setConsumerWindowSize()</literal> method and some of the
- <literal>ClientSession.createConsumer()</literal> methods.</para>
+ <title>使用核心接口(Core API)进行流控制</title>
+ <para>Hornet的核心接口中,<literal
+ >ClientSessionFactory.setConsumerWindowSize()</literal>方法和一些
+ <literal>ClientSession.createConsumer()</literal>方法可以控制流的窗口大小。</para>
</section>
<section>
- <title>Using JMS</title>
- <para>if JNDI is used to look up the connection factory, the consumer window size is
- configured in <literal>hornetq-jms.xml</literal>:</para>
+ <title>使用JMS的流控制</title>
+ <para>若使用JNDI来获得连接工厂,则需要通过配置<literal>hornetq-jms.xml</literal>文件来设定窗口大小:</para>
<programlisting>
<connection-factory name="ConnectionFactory">
<connectors>
@@ -122,36 +97,28 @@
<consumer-window-size>0</consumer-window-size>
</connection-factory>
</programlisting>
- <para>If the connection factory is directly instantiated, the consumer window size is
- specified by <literal>HornetQConnectionFactory.setConsumerWindowSize()</literal>
- method.</para>
- <para>Please see <xref linkend="examples.no-consumer-buffering"/> for an example which
- shows how to configure HornetQ to prevent consumer buffering when dealing with slow
- consumers.</para>
+ <para>如果直接实例化连接工厂,则使用<literal>HornetQConnectionFactory.setConsumerWindowSize()</literal>
+ 方法来设定窗口大小。</para>
+ <para>参见例子<xref linkend="examples.no-consumer-buffering"/>来了解如何配置HornetQ来
+ 关闭接收者的缓存。</para>
</section>
</section>
<section>
- <title>Rate limited flow control</title>
- <para>It is also possible to control the <emphasis>rate</emphasis> at which a consumer can
- consume messages. This is a form of throttling and can be used to make sure that a
- consumer never consumes messages at a rate faster than the rate specified. </para>
- <para>The rate must be a positive integer to enable this functionality and is the maximum
- desired message consumption rate specified in units of messages per second. Setting this
- to <literal>-1</literal> disables rate limited flow control. The default value is
- <literal>-1</literal>.</para>
- <para>Please see <xref linkend="examples.consumer-rate-limit"/> for a working example of
- limiting consumer rate.</para>
+ <title>速率流控制</title>
+ <para>我们还可以通过控制 <emphasis>速率</emphasis>的方法来控制流。这是一种像调节节流阀的形式。
+ 这种方法保证一个接收者接收消息的速率不会超过设定的值。 </para>
+ <para>速率必须是一个正整数。它代表最大接收速度,单位是消息每秒。将它设为<literal>-1</literal>就会关闭速率流控制。
+ 默认值是<literal>-1</literal>。</para>
+ <para>参见有关速率流控制的例子<xref linkend="examples.consumer-rate-limit"/>以进一步了解它的工作原理。</para>
<section id="flow-control.rate.core.api">
- <title>Using Core API</title>
- <para>If the HornetQ core API is being used the rate can be set via the <literal
- >ClientSessionFactory.setConsumerMaxRate(int consumerMaxRate)</literal> method or
- alternatively via some of the <literal>ClientSession.createConsumer()</literal>
- methods. </para>
+ <title>使用核心接口(Core API)</title>
+ <para>HornetQ的核心接口的<literal
+ >ClientSessionFactory.setConsumerMaxRate(int consumerMaxRate)</literal>方法或
+ 某些<literal>ClientSession.createConsumer()</literal>方法可以实现对流的速率控制。</para>
</section>
<section>
- <title>Using JMS</title>
- <para>If JNDI is used to look up the connection factory, the max rate can be configured
- in <literal>hornetq-jms.xml</literal>:</para>
+ <title>使用JMS</title>
+ <para>如果从JNDI中获取连接工厂,需要通过配置<literal>hornetq-jms.xml</literal>来进行速率流控制:</para>
<programlisting><connection-factory name="ConnectionFactory">
<connectors>
<connector-ref connector-name="netty-connector"/>
@@ -164,49 +131,37 @@
of 10 messages per sec -->
<consumer-max-rate>10</consumer-max-rate>
</connection-factory></programlisting>
- <para>If the connection factory is directly instantiated, the max rate size can be set
- via the <literal>HornetQConnectionFactory.setConsumerMaxRate(int
- consumerMaxRate)</literal> method.</para>
+ <para>如果是直接实例化连接工厂,则通过<literal>HornetQConnectionFactory.setConsumerMaxRate(int
+ consumerMaxRate)</literal>方法来设定最大流速率。</para>
<note>
- <para>Rate limited flow control can be used in conjunction with window based flow
- control. Rate limited flow control only effects how many messages a client can
- consume in a second and not how many messages are in its buffer. So if you had a
- slow rate limit and a high window based limit the clients internal buffer would
- soon fill up with messages.</para>
+ <para>速率流控制可以与窗口流控制结合使用。速率控制只规定了客户端每秒接收多少消息。因此如果你设定
+ 了一个较低的速率,同时又设定了一个大的缓存窗口,那么客户端的缓存将会很快饱和。</para>
</note>
- <para>Please see <xref linkend="examples.consumer-rate-limit"/> for an example which
- shows how to configure HornetQ to prevent consumer buffering when dealing with slow
- consumers.</para>
+ <para>参见接收速率流控制的例子<xref linkend="examples.consumer-rate-limit"/>进一步了解速率流控制的配置和使用。</para>
</section>
</section>
</section>
<section>
- <title>Producer flow control</title>
- <para>HornetQ also can limit the amount of data sent from a client to a server to prevent the
- server being overwhelmed.</para>
+ <title>发送者(producer)的流控制</title>
+ <para>HornetQ还可以控制客户端向服务器发送消息的速度,以避免服务器因大量数据过载。</para>
<section>
- <title>Window based flow control</title>
- <para>In a similar way to consumer window based flow control, HornetQ producers, by
- default, can only send messages to an address as long as they have sufficient credits to
- do so. The amount of credits required to send a message is given by the size of the
- message.</para>
- <para>As producers run low on credits they request more from the server, when the server
- sends them more credits they can send more messages.</para>
- <para>The amount of credits a producer requests in one go is known as the <emphasis
- role="italic">window size</emphasis>.</para>
- <para>The window size therefore determines the amount of bytes that can be in-flight at any
- one time before more need to be requested - this prevents the remoting connection from
- getting overloaded.</para>
+ <title>基于窗口的流控制</title>
+ <para>与接收者的相应的控制相似。在默认条件下,发送者要有足够的份额(credits)才可以向服务器的地址发送消息。
+ 这个份额就是消息的大小。</para>
+ <para>当发送者的份额不足时,它要向服务器请求更多的份额以便发送更多的消息。</para>
+ <para>发送者一次向服务器请求的份额值被称为<emphasis
+ role="italic">窗口大小</emphasis>。</para>
+ <para>于是窗口大小就是指发送者向服务器不间断发送消息的总最大字节数。当发送完毕时需再向服务器请求份额。这样就避免了
+ 服务器消息过载的情况。</para>
<section>
- <title>Using Core API</title>
- <para>If the HornetQ core API is being used, window size can be set via the <literal
+ <title>使用核心接口(Core API)</title>
+ <para>若使用核心接口,<literal
>ClientSessionFactory.setProducerWindowSize(int producerWindowSize)</literal>
- method.</para>
+ 方法可以对窗口大小进行设定。</para>
</section>
<section>
- <title>Using JMS</title>
- <para>If JNDI is used to look up the connection factory, the producer window size can be
- configured in <literal>hornetq-jms.xml</literal>:</para>
+ <title>使用JMS</title>
+ <para>如果使用JNDI来获得连接工厂,则需要配置<literal>hornetq-jms.xml</literal>文件以设定窗口大小:</para>
<programlisting>
<connection-factory name="ConnectionFactory">
<connectors>
@@ -217,39 +172,27 @@
</entries>
<producer-window-size>10</producer-window-size>
</connection-factory></programlisting>
- <para>If the connection factory is directly instantiated, the producer window size can
- be set via the <literal>HornetQConnectionFactory.setProducerWindowSize(int
- producerWindowSize)</literal> method.</para>
+ <para>如果是直接实例化连接工厂,则使用<literal>HornetQConnectionFactory.setProducerWindowSize(int
+ producerWindowSize)</literal>方法来设定窗口大小。</para>
</section>
<section>
- <title>Blocking producer window based flow control</title>
- <para>Normally the server will always give the same number of credits as have been
- requested. However, it is also possible to set a maximum size on any address, and the
- server will never send more credits than could cause the address's upper memory limit
- to be exceeded.</para>
- <para>For example, if I have a JMS queue called "myqueue", I could set the maximum
- memory size to 10MiB, and the the server will control the number of credits sent to
- any producers which are sending any messages to myqueue such that the total messages
- in the queue never exceeds 10MiB.</para>
- <para>When the address gets full, producers will block on the client side until more
- space frees up on the address, i.e. until messages are consumed from the queue thus
- freeing up space for more messages to be sent.</para>
- <para>We call this blocking producer flow control, and it's an efficient way to prevent
- the server running out of memory due to producers sending more messages than can be
- handled at any time.</para>
- <para>It is an alternative approach to paging, which does not block producers but
- instead pages messages to storage.</para>
- <para>To configure an address with a maximum size and tell the server that you want to
- block producers for this address if it becomes full, you need to define an
- AddressSettings (<xref linkend="queue-attributes.address-settings"/>) block for the
- address and specify <literal>max-size-bytes</literal> and <literal
- >address-full-policy</literal></para>
- <para>The address block applies to all queues registered to that address. I.e. the total
- memory for all queues bound to that address will not exceed <literal
- >max-size-bytes</literal>. In the case of JMS topics this means the <emphasis
- role="italic">total</emphasis> memory of all subscriptions in the topic won't
- exceed max-size-bytes.</para>
- <para>Here's an example:</para>
+ <title>限定发送者窗口流控制</title>
+ <para>通常情况下客户端请求多少份额,HornetQ服务器就给予多少份额。然而我们还可以针对每个地址来设定一个最大
+ 的份额值,以使服务器给出的份额都不大于该值。这样可以防止一个地址的内存溢出。</para>
+ <para>例如,如果有一个队列称为“myqueue”。将它的最大内存值设为10MiB,则服务器就会控制给出的份额以保证向该队列的地
+ 址发送消息时不会占大于10MiB的内存空间。</para>
+ <para>当一相地址已经满了的时候,发送者将会阻塞直到该地址有了多余的空间为止,即地址中的消息被接收了一部分后使得
+ 地址腾出了一些空间。</para>
+ <para>我们将这种控制方法称为限定发送者窗口流控制。这是一种有效的防止服务器内存溢出的手段。</para>
+ <para>它可以看成是分页转存(paging)的另一种方法。分页转存不阻塞发送者,它将消息转存到存贮介质上以节省内存的空间。</para>
+ <para>要配置一个地址的最大容量并告诉服务器在地址满了的情况下阻塞发送者,你需要为该地址定义一个
+ AddressSettings (<xref linkend="queue-attributes.address-settings"/>) 并设定
+ <literal>max-size-bytes</literal> 和 <literal
+ >address-full-policy</literal>。</para>
+ <para>这个配置对所有注册到该地址的队列有效。即所有注册队列的总内存将不超过 <literal
+ >max-size-bytes</literal>。对于JMS topic情况则意谓着该topic的所有订阅的内存不能超过
+ max-size-bytes的设定值。</para>
+ <para>下面是一个例子:</para>
<programlisting>
<address-settings>
<address-setting match="jms.queue.exampleQueue">
@@ -257,38 +200,27 @@
<address-full-policy>DROP</address-full-policy>
</address-setting>
</address-settings></programlisting>
- <para>The above example would set the max size of the JMS queue "exampleQueue" to be
- 100000 bytes and would block any producers sending to that address to prevent that
- max size being exceeded.</para>
- <para>Note the policy must be set to <literal>DROP</literal> to enable blocking producer
- flow control.</para>
- <para>Please note the default value for <literal>address-full-policy</literal> is to
- <literal>PAGE</literal>. Please see the chapter on paging for more information on
- paging.</para>
+ <para>上面的例子将JMS队列"exampleQueue"的最大内存值设为
+ 100000 字节并且阻塞发送者以防止消息量超过这个值。</para>
+ <para>注意必须设置 <literal>DROP</literal>的策略才能打开限定发送者窗口控制。</para>
+ <para>请注意默认的<literal>address-full-policy</literal> 是
+ <literal>PAGE</literal>。请参阅分页转存(paging)的相关章节作进一步的了解。</para>
</section>
</section>
<section>
- <title>Rate limited flow control</title>
- <para>HornetQ also allows the rate a producer can emit message to be limited, in units of
- messages per second. By specifying such a rate, HornetQ will ensure that producer never
- produces messages at a rate higher than that specified.</para>
- <para>The rate must be a positive integer to enable this functionality and is the maximum
- desired message consumption rate specified in units of messages per second. Setting this
- to <literal>-1</literal> disables rate limited flow control. The default value is
- <literal>-1</literal>.</para>
- <para>Please see the <xref linkend="producer-rate-limiting-example"/> for a working example
- of limiting producer rate.</para>
+ <title>速率流控制</title>
+ <para>HornetQ也可以控制发送者发送消息的速率。单位是每秒消息数。通过设定速率可保证发送者的发送速率不超过某个值。</para>
+ <para>速率必须是一个正整数。如果设为 <literal>-1</literal> 则关闭速率流控制。默认值是<literal>-1</literal>。</para>
+ <para>请参见例子<xref linkend="producer-rate-limiting-example"/>进一步了解速率流控制的使用方法。</para>
<section id="flow-control.producer.rate.core.api">
- <title>Using Core API</title>
- <para>If the HornetQ core API is being used the rate can be set via the <literal
- >ClientSessionFactory.setProducerMaxRate(int consumerMaxRate)</literal> method or
- alternatively via some of the <literal>ClientSession.createProducer()</literal>
- methods. </para>
+ <title>使用核心接口(Core API)</title>
+ <para>如果使用核心接口,<literal
+ >ClientSessionFactory.setProducerMaxRate(int consumerMaxRate)</literal>方法或
+ 某些 <literal>ClientSession.createProducer()</literal>方法可以设置最大速率值。</para>
</section>
<section>
- <title>Using JMS</title>
- <para>If JNDI is used to look up the connection factory, the max rate can be configured
- in <literal>hornetq-jms.xml</literal>:</para>
+ <title>使用 JMS</title>
+ <para>如果使用JNDI,需要配置<literal>hornetq-jms.xml</literal>文件:</para>
<programlisting><connection-factory name="ConnectionFactory">
<connectors>
<connector-ref connector-name="netty-connector"/>
@@ -301,9 +233,8 @@
of 10 messages per sec -->
<producer-max-rate>10</producer-max-rate>
</connection-factory></programlisting>
- <para>If the connection factory is directly instantiated, the max rate size can be set
- via the <literal>HornetQConnectionFactory.setProducerMaxRate(int
- consumerMaxRate)</literal> method.</para>
+ <para>如果直接实例化连接工厂,则使用<literal>HornetQConnectionFactory.setProducerMaxRate(int
+ consumerMaxRate)</literal>方法来设置。</para>
</section>
</section>
</section>
Modified: branches/HnetQ_323_cn/docs/user-manual/zh/transaction-config.xml
===================================================================
--- branches/HnetQ_323_cn/docs/user-manual/zh/transaction-config.xml 2010-04-06 15:30:28 UTC (rev 9062)
+++ branches/HnetQ_323_cn/docs/user-manual/zh/transaction-config.xml 2010-04-06 15:31:44 UTC (rev 9063)
@@ -17,18 +17,13 @@
<!-- permitted by applicable law. -->
<!-- ============================================================================= -->
<chapter id="transaction-config">
- <title>Resource Manager Configuration</title>
- <para>HornetQ has its own Resource Manager for handling the lifespan of JTA transactions. When a
- transaction is started the resource manager is notified and keeps a record of the
- transaction and its current state. It is possible in some cases for a transaction to be
- started but then forgotten about. Maybe the client died and never came back. If this happens
- then the transaction will just sit there indefinitely.</para>
- <para>To cope with this HornetQ can, if configured, scan for old transactions and rollback any
- it finds. The default for this is 3000000 milliseconds (5 minutes), i.e. any transactions older
- than 5 minutes are removed. This timeout can be changed by editing the <literal
- >transaction-timeout</literal> property in <literal>hornetq-configuration.xml</literal> (value must be in milliseconds).
- The property <literal>transaction-timeout-scan-period</literal> configures how often, in
- milliseconds, to scan for old transactions.</para>
- <para>Please note that HornetQ will not unilaterally rollback any XA transactions in a prepared state - this must be heuristically rolled
- back via the management API if you are sure they will never be resolved by the transaction manager.</para>
+ <title>配置资源管理器(Resource Manager)</title>
+ <para>HornetQ有自己的资源管理器来管理JTA事务。当一个事务开始时,资源管理器就得到通知并记录下该事务和它的状态。
+ 有的时候一个事务开始后,最終被忘记。有时客户端崩溃并且再也不能恢复,这样的话该事务就一直存在下去。</para>
+ <para>为了解决这个问题,可以配置HornetQ来扫描过期的事务,并且将它们回滚。默认值是3000000毫秒(5分钟)。
+ 它表示任何超过5分钟的事务都将被删除。这个超时对应的参数是<literal
+ >transaction-timeout</literal>,它在配置文件<literal>hornetq-configuration.xml</literal>中(单位毫秒)。
+ 参数<literal>transaction-timeout-scan-period</literal>定义了HornetQ扫描过期事务的间隔。</para>
+ <para>注意HornetQ不会单方面回滚一个已经处于准备状态的XA事务。如果你认为这些事务永远不会被事务管理器(transaction manager)
+ 来处理的话,你必须通过管理接口来进行回滚。</para>
</chapter>
14 years, 1 month
JBoss hornetq SVN: r9062 - in trunk: src/main/org/hornetq/core/server/impl and 5 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-04-06 11:30:28 -0400 (Tue, 06 Apr 2010)
New Revision: 9062
Added:
trunk/tests/src/org/hornetq/tests/integration/core/
trunk/tests/src/org/hornetq/tests/integration/core/deployers/
trunk/tests/src/org/hornetq/tests/integration/core/deployers/impl/
trunk/tests/src/org/hornetq/tests/integration/core/deployers/impl/QueueDeployerTest.java
Removed:
trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java
Modified:
trunk/src/main/org/hornetq/core/deployers/impl/QueueDeployer.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-352: Core queues from XML Configuration are not deployed
* make the QueueDeployer use directly HornetQServer instead of the management interface HornetQServerControl
* moved test to integration package and use the real objects instead of fake ones
Modified: trunk/src/main/org/hornetq/core/deployers/impl/QueueDeployer.java
===================================================================
--- trunk/src/main/org/hornetq/core/deployers/impl/QueueDeployer.java 2010-04-06 10:03:01 UTC (rev 9061)
+++ trunk/src/main/org/hornetq/core/deployers/impl/QueueDeployer.java 2010-04-06 15:30:28 UTC (rev 9062)
@@ -13,9 +13,10 @@
package org.hornetq.core.deployers.impl;
-import org.hornetq.api.core.management.HornetQServerControl;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.deployers.DeploymentManager;
+import org.hornetq.core.server.HornetQServer;
import org.w3c.dom.Node;
/**
@@ -27,15 +28,15 @@
*/
public class QueueDeployer extends XmlDeployer
{
- private final HornetQServerControl serverControl;
+ private final HornetQServer server;
private final FileConfigurationParser parser = new FileConfigurationParser();
- public QueueDeployer(final DeploymentManager deploymentManager, final HornetQServerControl serverControl)
+ public QueueDeployer(final DeploymentManager deploymentManager, final HornetQServer server)
{
super(deploymentManager);
- this.serverControl = serverControl;
+ this.server = server;
}
/**
@@ -64,10 +65,11 @@
{
CoreQueueConfiguration queueConfig = parser.parseQueueConfiguration(node);
- serverControl.deployQueue(queueConfig.getAddress(),
- queueConfig.getName(),
- queueConfig.getFilterString(),
- queueConfig.isDurable());
+ server.deployQueue(SimpleString.toSimpleString(queueConfig.getAddress()),
+ SimpleString.toSimpleString(queueConfig.getName()),
+ SimpleString.toSimpleString(queueConfig.getFilterString()),
+ queueConfig.isDurable(),
+ false);
}
@Override
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-04-06 10:03:01 UTC (rev 9061)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-04-06 15:30:28 UTC (rev 9062)
@@ -1045,7 +1045,7 @@
if (configuration.isFileDeploymentEnabled())
{
- queueDeployer = new QueueDeployer(deploymentManager, messagingServerControl);
+ queueDeployer = new QueueDeployer(deploymentManager, this);
queueDeployer.start();
}
Copied: trunk/tests/src/org/hornetq/tests/integration/core/deployers/impl/QueueDeployerTest.java (from rev 9056, trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/core/deployers/impl/QueueDeployerTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/core/deployers/impl/QueueDeployerTest.java 2010-04-06 15:30:28 UTC (rev 9062)
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.core.deployers.impl;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.deployers.DeploymentManager;
+import org.hornetq.core.deployers.impl.FileDeploymentManager;
+import org.hornetq.core.deployers.impl.QueueDeployer;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.postoffice.impl.LocalQueueBinding;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.tests.util.UnitTestCase;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+/**
+ * A QueueDeployerTest
+ *
+ * @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
+ *
+ */
+public class QueueDeployerTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private QueueDeployer deployer;
+
+ private HornetQServer server;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testParseQueueConfiguration() throws Exception
+ {
+ String xml = "<configuration xmlns='urn:hornetq'>"
+ + " <queues>"
+ + " <queue name='foo'>"
+ + " <address>bar</address>"
+ + " <filter string='speed > 88' />"
+ + " <durable>false</durable>"
+ + " </queue>"
+ + " </queues>"
+ + "</configuration>";
+
+ Element rootNode = org.hornetq.utils.XMLUtil.stringToElement(xml);
+ deployer.validate(rootNode);
+ NodeList queueNodes = rootNode.getElementsByTagName("queue");
+ assertEquals(1, queueNodes.getLength());
+ deployer.deploy(queueNodes.item(0));
+
+ Bindings bindings = server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("bar"));
+ assertEquals(1, bindings.getBindings().size());
+ Binding binding = bindings.getBindings().iterator().next();
+ assertTrue(binding instanceof LocalQueueBinding);
+ LocalQueueBinding queueBinding = (LocalQueueBinding)binding;
+
+ assertEquals("foo", queueBinding.getQueue().getName().toString());
+ assertEquals("speed > 88", queueBinding.getQueue().getFilter().getFilterString().toString());
+ assertEquals(false, queueBinding.getQueue().isDurable());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ DeploymentManager deploymentManager = new FileDeploymentManager(500);
+ server = new HornetQServerImpl();
+ server.start();
+ deployer = new QueueDeployer(deploymentManager, server);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ deployer = null;
+ server.stop();
+
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Deleted: trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java 2010-04-06 10:03:01 UTC (rev 9061)
+++ trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java 2010-04-06 15:30:28 UTC (rev 9062)
@@ -1,619 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.unit.core.deployers.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.management.HornetQServerControl;
-import org.hornetq.api.core.management.Parameter;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.CoreQueueConfiguration;
-import org.hornetq.core.deployers.DeploymentManager;
-import org.hornetq.core.deployers.impl.QueueDeployer;
-import org.hornetq.core.security.Role;
-import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.util.UnitTestCase;
-import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
-
-/**
- * A QueueDeployerTest
- *
- * @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
- *
- */
-public class QueueDeployerTest extends UnitTestCase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private FakeServerControl serverControl;
-
- private QueueDeployer deployer;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testParseQueueConfigurationFromAddressSettings() throws Exception
- {
- String xml = "<configuration xmlns='urn:hornetq'>" + " <queues>"
- + " <queue name='foo'>"
- + " <address>bar</address>"
- + " <filter string='speed > 88' />"
- + " <durable>false</durable>"
- + " </queue>"
- + " </queues>"
- + "</configuration>";
-
- Element rootNode = org.hornetq.utils.XMLUtil.stringToElement(xml);
- deployer.validate(rootNode);
- NodeList queueNodes = rootNode.getElementsByTagName("queue");
- Assert.assertEquals(1, queueNodes.getLength());
- deployer.deploy(queueNodes.item(0));
-
- Assert.assertEquals(1, serverControl.configs.size());
-
- CoreQueueConfiguration queueConfiguration = serverControl.configs.get(0);
- Assert.assertEquals("foo", queueConfiguration.getName());
- Assert.assertEquals("bar", queueConfiguration.getAddress());
- Assert.assertEquals("speed > 88", queueConfiguration.getFilterString());
- Assert.assertEquals(false, queueConfiguration.isDurable());
- }
-
- public void testParseQueueConfigurationFromHornetQConfiguration() throws Exception
- {
- String xml = "<configuration xmlns='urn:hornetq'> " + "<queues>"
- + " <queue name='foo'>"
- + " <address>bar</address>"
- + " <filter string='speed > 88' />"
- + " <durable>false</durable>"
- + " </queue>"
- + " <queue name='foo2'>"
- + " <address>bar2</address>"
- + " <filter string='speed > 88' />"
- + " <durable>true</durable>"
- + " </queue>"
- + "</queues>"
- + "</configuration>";
-
- Element rootNode = org.hornetq.utils.XMLUtil.stringToElement(xml);
- deployer.validate(rootNode);
- NodeList queueNodes = rootNode.getElementsByTagName("queue");
- Assert.assertEquals(2, queueNodes.getLength());
-
- deployer.deploy(queueNodes.item(0));
- deployer.deploy(queueNodes.item(1));
-
- Assert.assertEquals(2, serverControl.configs.size());
- Assert.assertEquals("foo", serverControl.configs.get(0).getName());
- Assert.assertEquals("foo2", serverControl.configs.get(1).getName());
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- DeploymentManager deploymentManager = new FakeDeploymentManager();
- serverControl = new FakeServerControl();
- deployer = new QueueDeployer(deploymentManager, serverControl);
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- deployer = null;
-
- super.tearDown();
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
- private class FakeServerControl implements HornetQServerControl
- {
- public boolean isSharedStore()
- {
- return false;
- }
-
- public int getThreadPoolMaxSize()
- {
- return 0;
- }
-
- public boolean closeConnectionsForAddress(final String ipAddress) throws Exception
- {
-
- return false;
- }
-
- public boolean commitPreparedTransaction(final String transactionAsBase64) throws Exception
- {
-
- return false;
- }
-
- public void createQueue(final String address, final String name, final String filter, final boolean durable) throws Exception
- {
-
- }
-
- public void createQueue(final String address, final String name) throws Exception
- {
-
- }
-
- public void createQueue(final String address, final String name, final boolean durable) throws Exception
- {
-
- }
-
- public String[] getAddressNames()
- {
- return null;
- }
-
- public String[] getQueueNames()
- {
- return null;
- }
-
- List<CoreQueueConfiguration> configs = new ArrayList<CoreQueueConfiguration>();
-
- public void deployQueue(final String address, final String name, final String filter, final boolean durable) throws Exception
- {
- CoreQueueConfiguration config = new CoreQueueConfiguration(address, name, filter, durable);
-
- configs.add(config);
- }
-
- public void deployQueue(final String address, final String name, final String filterString) throws Exception
- {
-
- }
-
- public void destroyQueue(final String name) throws Exception
- {
-
- }
-
- public void disableMessageCounters() throws Exception
- {
-
- }
-
- public void enableMessageCounters() throws Exception
- {
-
- }
-
- public String getBackupConnectorName()
- {
-
- return null;
- }
-
- public String getBindingsDirectory()
- {
-
- return null;
- }
-
- public Configuration getConfiguration()
- {
-
- return null;
- }
-
- public int getConnectionCount()
- {
-
- return 0;
- }
-
- public long getConnectionTTLOverride()
- {
-
- return 0;
- }
-
- public Object[] getConnectors() throws Exception
- {
-
- return null;
- }
-
- public String getConnectorsAsJSON() throws Exception
- {
- return null;
- }
-
- public int getIDCacheSize()
- {
-
- return 0;
- }
-
- public String[] getInterceptorClassNames()
- {
-
- return null;
- }
-
- public int getJournalBufferReuseSize()
- {
-
- return 0;
- }
-
- public String getJournalDirectory()
- {
-
- return null;
- }
-
- public int getJournalFileSize()
- {
-
- return 0;
- }
-
- public int getJournalMaxIO()
- {
-
- return 0;
- }
-
- public int getJournalMinFiles()
- {
-
- return 0;
- }
-
- public String getJournalType()
- {
-
- return null;
- }
-
- public String getLargeMessagesDirectory()
- {
-
- return null;
- }
-
- public String getManagementAddress()
- {
-
- return null;
- }
-
- public String getManagementNotificationAddress()
- {
-
- return null;
- }
-
- public long getManagementRequestTimeout()
- {
-
- return 0;
- }
-
- public int getMessageCounterMaxDayCount()
- {
-
- return 0;
- }
-
- public long getMessageCounterSamplePeriod()
- {
-
- return 0;
- }
-
- public long getMessageExpiryScanPeriod()
- {
-
- return 0;
- }
-
- public long getMessageExpiryThreadPriority()
- {
-
- return 0;
- }
-
- public String getPagingDirectory()
- {
-
- return null;
- }
-
- public long getQueueActivationTimeout()
- {
-
- return 0;
- }
-
- public int getScheduledThreadPoolMaxSize()
- {
-
- return 0;
- }
-
- public long getSecurityInvalidationInterval()
- {
-
- return 0;
- }
-
- public long getTransactionTimeout()
- {
-
- return 0;
- }
-
- public long getTransactionTimeoutScanPeriod()
- {
-
- return 0;
- }
-
- public String getVersion()
- {
-
- return null;
- }
-
- public boolean isBackup()
- {
-
- return false;
- }
-
- public boolean isClustered()
- {
-
- return false;
- }
-
- public boolean isCreateBindingsDir()
- {
-
- return false;
- }
-
- public boolean isCreateJournalDir()
- {
-
- return false;
- }
-
- public boolean isJournalSyncNonTransactional()
- {
-
- return false;
- }
-
- public boolean isJournalSyncTransactional()
- {
-
- return false;
- }
-
- public boolean isMessageCounterEnabled()
- {
-
- return false;
- }
-
- public boolean isPersistDeliveryCountBeforeDelivery()
- {
-
- return false;
- }
-
- public boolean isAsyncConnectionExecutionEnabled()
- {
- return false;
- }
-
- public boolean isPersistIDCache()
- {
-
- return false;
- }
-
- public boolean isSecurityEnabled()
- {
-
- return false;
- }
-
- public boolean isStarted()
- {
-
- return false;
- }
-
- public boolean isWildcardRoutingEnabled()
- {
-
- return false;
- }
-
- public String[] listConnectionIDs() throws Exception
- {
-
- return null;
- }
-
- public String[] listPreparedTransactions() throws Exception
- {
-
- return null;
- }
-
- public String[] listHeuristicCommittedTransactions() throws Exception
- {
- return null;
- }
-
- public String[] listHeuristicRolledBackTransactions() throws Exception
- {
- return null;
- }
-
- public String[] listRemoteAddresses() throws Exception
- {
-
- return null;
- }
-
- public String[] listRemoteAddresses(final String ipAddress) throws Exception
- {
-
- return null;
- }
-
- public String[] listSessions(final String connectionID) throws Exception
- {
-
- return null;
- }
-
- public void resetAllMessageCounterHistories() throws Exception
- {
-
- }
-
- public void resetAllMessageCounters() throws Exception
- {
-
- }
-
- public boolean rollbackPreparedTransaction(final String transactionAsBase64) throws Exception
- {
-
- return false;
- }
-
- public void sendQueueInfoToQueue(final String queueName, final String address) throws Exception
- {
-
- }
-
- public void setMessageCounterMaxDayCount(final int count) throws Exception
- {
-
- }
-
- public void setMessageCounterSamplePeriod(final long newPeriod) throws Exception
- {
-
- }
-
- public int getJournalBufferSize()
- {
-
- return 0;
- }
-
- public int getJournalBufferTimeout()
- {
- return 0;
- }
-
- public int getJournalCompactMinFiles()
- {
- return 0;
- }
-
- public int getJournalCompactPercentage()
- {
- return 0;
- }
-
- public boolean isPersistenceEnabled()
- {
- return false;
- }
-
- public void addSecuritySettings(String addressMatch,
- String sendRoles,
- String consumeRoles,
- String createDurableQueueRoles,
- String deleteDurableQueueRoles,
- String createTempQueueRoles,
- String deleteTempQueueRoles,
- String manageRoles) throws Exception
- {
- }
-
- public void removeSecuritySettings(String addressMatch) throws Exception
- {
- }
-
- public Set<Role> getSecuritySettings(String addressMatch) throws Exception
- {
- return null;
- }
-
- public Object[] getRoles(String addressMatch) throws Exception
- {
- return null;
- }
-
- public String getRolesAsJSON(String addressMatch) throws Exception
- {
- return null;
- }
-
- public void addAddressSettings(@Parameter(desc = "an address match", name = "addressMatch") String addressMatch, @Parameter(desc = "the dead letter address setting", name = "DLA") String DLA, @Parameter(desc = "the expiry address setting", name = "expiryAddress") String expiryAddress, @Parameter(desc = "are any queues created for this address a last value queue", name = "lastValueQueue") boolean lastValueQueue, @Parameter(desc = "the delivery attempts", name = "deliveryAttempts") int deliveryAttempts, @Parameter(desc = "the max size in bytes", name = "maxSizeBytes") long maxSizeBytes, @Parameter(desc = "the page size in bytes", name = "pageSizeBytes") int pageSizeBytes, @Parameter(desc = "the redelivery delay", name = "redeliveryDelay") long redeliveryDelay, @Parameter(desc = "the redistribution delay", name = "redistributionDelay") long redistributionDelay, @Parameter(desc = "do we send to the DLA when there is no where to route the message", name = "sendToDLAOnNoRou!
te") boolean sendToDLAOnNoRoute, @Parameter(desc = "the ploicy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy) throws Exception
- {
-
- }
-
- public AddressSettings getAddressSettings(String address)
- {
- return null;
- }
-
- public void removeAddressSettings(String addressMatch)
- {
-
- }
-
- public String getAddressSettingsAsJSON(@Parameter(desc = "an address match", name = "addressMatch") String addressMatch) throws Exception
- {
- return null;
- }
- }
-
-}
14 years, 1 month
JBoss hornetq SVN: r9061 - branches/HnetQ_323_cn/docs/user-manual/zh.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-04-06 06:03:01 -0400 (Tue, 06 Apr 2010)
New Revision: 9061
Modified:
branches/HnetQ_323_cn/docs/user-manual/zh/connection-ttl.xml
Log:
done
Modified: branches/HnetQ_323_cn/docs/user-manual/zh/connection-ttl.xml
===================================================================
--- branches/HnetQ_323_cn/docs/user-manual/zh/connection-ttl.xml 2010-04-06 09:46:56 UTC (rev 9060)
+++ branches/HnetQ_323_cn/docs/user-manual/zh/connection-ttl.xml 2010-04-06 10:03:01 UTC (rev 9061)
@@ -17,17 +17,14 @@
<!-- permitted by applicable law. -->
<!-- ============================================================================= -->
<chapter id="connection-ttl">
- <title>Detecting Dead Connections</title>
- <para>In this section we will discuss connection time-to-live (TTL) and explain how HornetQ
- deals with crashed clients and clients which have exited without cleanly closing their
- resources.</para>
+ <title>失效连接的检测</title>
+ <para>本章将讨论连接的生存时间(TTL)以及HornetQ如何处理出现故障的客户端或者异常退出的客户端(即客户端在
+ 退出时没有合理的关闭相关资源)。</para>
<section id="dead.connections">
- <title>Cleaning up Dead Connection Resources on the Server</title>
- <para>Before a HornetQ client application exits it is considered good practice that it
- should close its resources in a controlled manner, using a <literal>finally</literal>
- block.</para>
- <para>Here's an example of a well behaved core client application closing its session and
- session factory in a finally block:</para>
+ <title>服务器端对失效连接的清除</title>
+ <para>当客户端的应用程序退出时,应该关闭所使用的资源。在<literal>finally</literal>进行资源的关闭
+ 是一个很好的方法。</para>
+ <para>下面的例子中,一个Hornet客户端在finally中关闭了它的会话(session)和会话工厂(session factory):</para>
<programlisting>
ClientSessionFactory sf = null;
ClientSession session = null;
@@ -53,7 +50,7 @@
}
}
</programlisting>
- <para>And here's an example of a well behaved JMS client application:</para>
+ <para>下面的例子给出了一个JMS客户端是如何适当关闭相关资源的:</para>
<programlisting>
Connection jmsConnection = null;
@@ -73,47 +70,31 @@
}
}
</programlisting>
- <para>Unfortunately users don't always write well behaved applications, and sometimes
- clients just crash so they don't have a chance to clean up their resources!</para>
- <para>If this occurs then it can leave server side resources, like sessions, hanging on the
- server. If these were not removed they would cause a resource leak on the server and
- over time this result in the server running out of memory or other resources.</para>
- <para>We have to balance the requirement for cleaning up dead client resources with the fact
- that sometimes the network between the client and the server can fail and then come
- back, allowing the client to reconnect. HornetQ supports client reconnection, so we
- don't want to clean up "dead" server side resources too soon or this will prevent any
- client from reconnecting, as it won't be able to find its old sessions on the
- server.</para>
- <para>HornetQ makes all of this configurable. For each <literal
- >ClientSessionFactory</literal> we define a <emphasis>connection TTL</emphasis>.
- Basically, the TTL determines how long the server will keep a connection alive in the
- absence of any data arriving from the client. The client will automatically send "ping"
- packets periodically to prevent the server from closing it down. If the server doesn't
- receive any packets on a connection for the connection TTL time, then it will
- automatically close all the sessions on the server that relate to that
- connection.</para>
- <para>If you're using JMS, the connection TTL is defined by the <literal
- >ConnectionTTL</literal> attribute on a <literal>HornetQConnectionFactory</literal>
- instance, or if you're deploying JMS connection factory instances direct into JNDI on
- the server side, you can specify it in the xml config, using the parameter <literal
- >connection-ttl</literal>.</para>
- <para>The default value for connection ttl is <literal>60000</literal>ms, i.e. 1 minute. A
- value of <literal>-1</literal> for <literal>ConnectionTTL</literal> means the server
- will never time out the connection on the server side.</para>
- <para>If you do not wish clients to be able to specify their own connection TTL, you can
- override all values used by a global value set on the server side. This can be done by
- specifying the <literal>connection-ttl-override</literal> attribute in the server side
- configuration. The default value for <literal>connection-ttl-override</literal> is
- <literal>-1</literal> which means "do not override" (i.e. let clients use their own
- values).</para>
+ <para>然而有时候资源在客户端得不到合理的关闭。有的客户端应用在结束时忘记了关闭资源,有的客户端有时发生故障导致
+ 程序突然中断,相关资源也没有来得及关闭!</para>
+ <para>如果上述情况发生了,那么这些资源就会留在服务器端而不会被清理。这就会造成资源泄漏现象并最終导致服务器内存
+ 溢出或其它资源的溢出错误。</para>
+ <para>因此在服务器端要有某种机制来避免资源的泄漏。也就是对无效资源进行回收。在判断什么是无效资源时,HornetQ
+ 考虑到了客户端重新连接的情况。就是当一个连接由于网络临时中断后又恢复正常时,客户端有可能通过不断重试
+ 成功地连接到服务器端。如果服务器端过早清除了相关的连接资源,则客户端就可能重试失败。</para>
+ <para>HornetQ的资源回收是完全可配置的。每个 <literal
+ >ClientSessionFactory</literal> 有一个<emphasis>连接 TTL</emphasis>的参数。
+ 这个参数的意义是当客户端的一个连接没有任何数到达服务器时,服务器充许这个连接有效的最长时间。客户端通过定
+ 时向服务器端发送“ping“数据包来维持连接的有效,以免被服务器关掉。如果服务器在TTL指定的时间内没有收到任何
+ 数据包,则认为该连接无效,继而关闭与该连接相关的所有的会话(session)。</para>
+ <para>如果使用JMS,<literal>HornetQConnectionFactory</literal>的<literal>ConnectionTTL</literal>
+ 属性是用来定义连接的存活时间的。如果你将JMS连接工厂部署到JNDI中,则应使用配置文件中的<literal
+ >connection-ttl</literal>参数来定义连接的TTL。</para>
+ <para>默认的连接TTL值是<literal>60000</literal>毫秒,即一分钟。 <literal>ConnectionTTL</literal>
+ 设为<literal>-1</literal>表示服务器永远不检测超时的连接。</para>
+ <para>如果你不想让客户端来规定连接存活时间(TTL),你可以在服务器端的配置文件中定义
+ <literal>connection-ttl-override</literal>属性。它的默认值是<literal>-1</literal>,表示
+ 服务器端该属性无效(即客户端可以定义自己的连接TTL)。</para>
<section>
- <title>Closing core sessions or JMS connections that you have failed to close</title>
- <para>As previously discussed, it's important that all core client sessions and JMS
- connections are always closed explicitly in a <literal>finally</literal> block when
- you are finished using them. </para>
- <para>If you fail to do so, HornetQ will detect this at garbage collection time, and log
- a warning similar to the following in the logs (If you are using JMS the warning
- will involve a JMS connection not a client session):</para>
+ <title>关闭没有被成功关闭的核心会话或JMS连接</title>
+ <para>如前所述,在使用完毕后在<literal>finally</literal>中将所有的核心会话或JMS连接关闭是十分重要的。</para>
+ <para>如果你没有这样做,HornetQ会在拉圾回收时进行检测并会在日志中打印类似以下的警告(如果是JMS则在警告中
+ 是相应的JMS连接):</para>
<programlisting>
[Finalizer] 20:14:43,244 WARNING [org.hornetq.core.client.impl.DelegatingSession] I'm closin
@@ -126,48 +107,33 @@
at org.acme.yourproject.YourClass (YourClass.java:666)
</programlisting>
- <para>HornetQ will then close the connection / client session for you.</para>
- <para>Note that the log will also tell you the exact line of your user code where you
- created the JMS connection / client session that you later did not close. This will
- enable you to pinpoint the error in your code and correct it appropriately.</para>
+ <para>HornetQ然后将未关闭的连接/会话关闭。</para>
+ <para>注意在日志的警告中还给出了创建JMS连接/客户端会话的具体行号,以便准确地确定出错的地方。</para>
</section>
</section>
<section>
- <title>Detecting failure from the client side.</title>
- <para>In the previous section we discussed how the client sends pings to the server and how
- "dead" connection resources are cleaned up by the server. There's also another reason
- for pinging, and that's for the <emphasis>client</emphasis> to be able to detect that
- the server or network has failed.</para>
- <para>As long as the client is receiving data from the server it will consider the
- connection to be still alive. </para>
- <para>If the client does not receive any packets for <literal
- >client-failure-check-period</literal> milliseconds then it will consider the
- connection failed and will either initiate failover, or call any <literal
- >FailureListener</literal> instances (or <literal>ExceptionListener</literal>
- instances if you are using JMS) depending on how it has been configured.</para>
- <para>If you're using JMS it's defined by the <literal>ClientFailureCheckPeriod</literal>
- attribute on a <literal>HornetQConnectionFactory</literal> instance, or if you're
- deploying JMS connection factory instances direct into JNDI on the server side, you can
- specify it in the <literal>hornetq-jms.xml </literal> configuration file, using the
- parameter <literal>client-failure-check-period</literal>.</para>
- <para>The default value for client failure check period is <literal>30000</literal>ms, i.e.
- 30 seconds. A value of <literal>-1</literal> means the client will never fail the
- connection on the client side if no data is received from the server. Typically this is
- much lower than connection TTL to allow clients to reconnect in case of transitory
- failure.</para>
+ <title>客户端的故障检测</title>
+ <para>前面讲述了客户端如何向服务器发送ping以及服务器端如何清理失效的连接。发送ping还有另外一个目的,就是能
+ 让客户端检测网络或服务器是否出现故障。</para>
+ <para>从客户端的角度来看,只要客户端能从一个连接不断接收服务器的数据,那么这个连接就被认为是一个有效的连接。</para>
+ <para>如果在属性<literal>client-failure-check-period</literal>所定义的时间内(单位毫秒)客户端没有
+ 收到任何数据,客户端就认为这们连接发生了故障。根据不同的配置,客户端在这种情况下要么进行failover,要么
+ 调用<literal>FailureListener</literal>的接口(或者是JMS的<literal>ExceptionListener</literal>)。</para>
+ <para>如果使用JMS,这个参数是<literal>HornetQConnectionFactory</literal>的<literal>ClientFailureCheckPeriod</literal>。
+ 如果你向JNDI部署JMS连接工厂,那么相应的参数在<literal>hornetq-jms.xml</literal>配置文件中,参数名
+ 为<literal>client-failure-check-period</literal>。</para>
+ <para>这个参数的默认值是<literal>30000</literal>毫秒,即半分钟。<literal>-1</literal>表示客户端不检查
+ 连接的有效性。即不论是否有数据来自服务器,连接都一直有效。这一参数通常要比服务器端的连接TTL小许多,以使
+ 客户端在出现短暂连接故障的情况下可以与服务器成功地重新连接。</para>
</section>
<section id="connection-ttl.async-connection-execution">
- <title>Configuring Asynchronous Connection Execution</title>
- <para>By default, packets received on the server side are executed on the remoting
- thread.</para>
- <para>It is possible instead to use a thread from a thread pool to handle some packets so
- that the remoting thread is not tied up for too long. However, please note that
- processing operations asynchronously on another thread adds a little more latency.
- Please note that most short running operations are always handled on the remoting thread for performance reasons.
-
- To enable asynchronous connection execution, set the parameter <literal
- >async-connection-execution-enabled</literal> in <literal
- >hornetq-configuration.xml</literal> to <literal>true</literal> (default value is
- <literal>true</literal>).</para>
+ <title>配置异步连接任务执行</title>
+ <para>默认情况下,服务器接收到的数据包被远程模块的线程处理。</para>
+ <para>为了避免远程模块的线程被长时间占用,数据包可以转给另外的一个线程池来处理。要注意这样做的增加了一些时间延迟。
+ 因此如果数据包处理耗时很少,还是由远程模块线程来处理较好。
+
+ 要配置这样的异步连接很行任务,将<literal>hornetq-configuration.xml</literal>文件中的
+ <literal>async-connection-execution-enabled</literal> 参数设为<literal>true</literal>
+ (默认值是 <literal>true</literal>)。</para>
</section>
</chapter>
14 years, 1 month