JBoss hornetq SVN: r8694 - trunk/src/main/org/hornetq/core/message/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-12-15 10:09:02 -0500 (Tue, 15 Dec 2009)
New Revision: 8694
Modified:
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
Log:
buffer fix
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-12-14 19:31:42 UTC (rev 8693)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-12-15 15:…
[View More]09:02 UTC (rev 8694)
@@ -838,8 +838,17 @@
// write it
buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, endOfBodyPosition);
- // Position at end of body and skip past the message end position int
- buffer.setIndex(0, endOfBodyPosition + DataConstants.SIZE_INT);
+ // Position at end of body and skip past the message end position int.
+ // check for enough room in the buffer even tho it is dynamic
+ if((endOfBodyPosition + 4) > buffer.capacity())
+ {
+ buffer.setIndex(0, endOfBodyPosition);
+ buffer.writeInt(0);
+ }
+ else
+ {
+ buffer.setIndex(0, endOfBodyPosition + DataConstants.SIZE_INT);
+ }
encodeHeadersAndProperties(buffer);
[View Less]
15 years
JBoss hornetq SVN: r8693 - tags.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-14 14:31:42 -0500 (Mon, 14 Dec 2009)
New Revision: 8693
Added:
tags/HornetQ_2_0_0_CR2/
Log:
tagged HornetQ 2.0.0.CR2
Copied: tags/HornetQ_2_0_0_CR2 (from rev 8692, tags/HornetQ_2_0_0_CR2_PENDING)
15 years
JBoss hornetq SVN: r8692 - tags.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-14 13:53:58 -0500 (Mon, 14 Dec 2009)
New Revision: 8692
Added:
tags/HornetQ_2_0_0_CR2_PENDING/
Log:
created pending tag for 2.0.0.CR2 release
Copied: tags/HornetQ_2_0_0_CR2_PENDING (from rev 8691, trunk)
15 years
JBoss hornetq SVN: r8691 - tags.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-14 13:52:02 -0500 (Mon, 14 Dec 2009)
New Revision: 8691
Removed:
tags/HornetQ_2_0_0_CR2_PENDING/
Log:
removed pending tag
15 years
JBoss hornetq SVN: r8690 - trunk/src/config/common.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-14 13:49:03 -0500 (Mon, 14 Dec 2009)
New Revision: 8690
Modified:
trunk/src/config/common/hornetq-version.properties
Log:
updated versionName for 2.0.0.CR2
Modified: trunk/src/config/common/hornetq-version.properties
===================================================================
--- trunk/src/config/common/hornetq-version.properties 2009-12-14 12:58:06 UTC (rev 8689)
+++ trunk/src/config/common/hornetq-version.properties 2009-12-14 18:49:03 UTC (rev 8690)
…
[View More]@@ -1,4 +1,4 @@
-hornetq.version.versionName=stinger
+hornetq.version.versionName=vespa
hornetq.version.majorVersion=2
hornetq.version.minorVersion=0
hornetq.version.microVersion=0
[View Less]
15 years
JBoss hornetq SVN: r8682 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-11 13:13:36 -0500 (Fri, 11 Dec 2009)
New Revision: 8682
Modified:
trunk/docs/user-manual/en/ha.xml
trunk/docs/user-manual/en/perf-tuning.xml
Log:
some tweaks to docs
Modified: trunk/docs/user-manual/en/ha.xml
===================================================================
--- trunk/docs/user-manual/en/ha.xml 2009-12-11 17:53:34 UTC (rev 8681)
+++ trunk/docs/user-manual/en/ha.xml 2009-12-11 18:13:36 UTC (rev 8682)
@@ -37,6 +37,10 @@
<para&…
[View More]gt;HornetQ provides two different modes for high availability, either by
<emphasis>replicating data</emphasis> from the live server journal to the backup
server or using a <emphasis>shared store</emphasis> for both servers.</para>
+ <note>
+ <para>Only persistent message data will survive failover. Any non persistent message
+ data will not be available after failover.</para>
+ </note>
<section id="ha.mode.replicated">
<title>Data Replication</title>
<para>In this mode, data stored in the HornetQ journal are replicated from the live
@@ -196,6 +200,9 @@
same server (e.g. in case of transient network problems). This is similar to failover,
except it's reconnecting to the same server and is discussed in <xref
linkend="client-reconnection"/></para>
+ <para>During failover, if the client has consumers on any non persistent or temporary
+ queues, those queues will be automatically recreated during failover on the backup node,
+ since the backup node will not have any knowledge of non persistent queues.</para>
<section id="ha.automatic.failover">
<title>Automatic Client Failover</title>
<para>HornetQ clients can be configured with knowledge of live and backup servers, so
Modified: trunk/docs/user-manual/en/perf-tuning.xml
===================================================================
--- trunk/docs/user-manual/en/perf-tuning.xml 2009-12-11 17:53:34 UTC (rev 8681)
+++ trunk/docs/user-manual/en/perf-tuning.xml 2009-12-11 18:13:36 UTC (rev 8682)
@@ -95,10 +95,9 @@
acknowledgements with one acknowledge/commit. </para>
</listitem>
<listitem>
- <para>Avoid durable messages. By default JMS messages are durable. If you
- don't really need durable messages then set them to be non-durable.
- Durable messages incur a lot more overhead in persisting them to
- storage.</para>
+ <para>Avoid durable messages. By default JMS messages are durable. If you don't
+ really need durable messages then set them to be non-durable. Durable messages
+ incur a lot more overhead in persisting them to storage.</para>
</listitem>
</itemizedlist>
</section>
@@ -107,10 +106,10 @@
<para>There are various other places in HornetQ where we can perform some tuning:</para>
<itemizedlist>
<listitem>
- <para>Use Asynchronous Send Acknowledgements. If you need to send durable
- messages non transactionally and you need a guarantee that they have reached the
- server by the time the call to send() returns, don't set durable messages to
- be sent blocking, instead use asynchronous send acknowledgements to get your
+ <para>Use Asynchronous Send Acknowledgements. If you need to send durable messages
+ non transactionally and you need a guarantee that they have reached the server
+ by the time the call to send() returns, don't set durable messages to be sent
+ blocking, instead use asynchronous send acknowledgements to get your
acknowledgements of send back in a separate stream, see <xref
linkend="send-guarantees"/> for more information on this.</para>
</listitem>
@@ -143,22 +142,36 @@
>journal-sync-non-transactional</literal> to <literal>false</literal> in
<literal>hornetq-configuration.xml</literal> can give you better
non-transactional persistent performance at the expense of some possibility of
- loss of durable messages on failure. See <xref linkend="send-guarantees"/>
- for more information.</para>
+ loss of durable messages on failure. See <xref linkend="send-guarantees"/> for
+ more information.</para>
</listitem>
<listitem>
- <para>Send messages non blocking. Setting <literal
- >block-on-durable-send</literal> and <literal
- >block-on-non-durable-send</literal> to <literal>false</literal> in
+ <para>Send messages non blocking. Setting <literal>block-on-durable-send</literal>
+ and <literal>block-on-non-durable-send</literal> to <literal>false</literal> in
<literal>hornetq-jms.xml</literal> (if you're using JMS and JNDI) or
directly on the ClientSessionFactory. This means you don't have to wait a whole
network round trip for every message sent. See <xref linkend="send-guarantees"/>
for more information.</para>
</listitem>
<listitem>
+ <para>Socket NIO vs Socket Old IO. By default HornetQ uses Socket NIO on the server
+ and old (blocking) IO on the client side (see the chapter on configuring
+ transports for more information <xref linkend="configuring-transports"/>). NIO
+ is much more scalable but can give you some latency hit compared to old blocking
+ IO. If you expect to be able to service many thousands of connections on the
+ server, then continue to use NIO on the server. However, if don't expect many
+ thousands of connections on the server you can configure the server acceptors to
+ use old IO, and might get a small performance advantage.</para>
+ </listitem>
+ <listitem>
<para>Use the core API not JMS. Using the JMS API you will have slightly lower
performance than using the core API, since all JMS operations need to be
- translated into core operations before the server can handle them.</para>
+ translated into core operations before the server can handle them. If using the
+ core API try to use methods that take <literal>SimpleString</literal> as much as
+ possible. <literal>SimpleString</literal>, unlike java.lang.String does not
+ require copying before it is written to the wire, so if you re-use <literal
+ >SimpleString</literal> instances between calls then you can avoid some
+ unnecessary copying.</para>
</listitem>
</itemizedlist>
</section>
[View Less]
15 years, 1 month
JBoss hornetq SVN: r8681 - in trunk/src/main/org/hornetq/core: server/impl and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-11 12:53:34 -0500 (Fri, 11 Dec 2009)
New Revision: 8681
Modified:
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
Log:
removed unnecessary copy
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/…
[View More]wireformat/SessionSendMessage.java 2009-12-11 17:20:56 UTC (rev 8680)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-12-11 17:53:34 UTC (rev 8681)
@@ -45,13 +45,6 @@
super(PacketImpl.SESS_SEND, message);
this.requiresResponse = requiresResponse;
-
- // If the message hasn't already been copied when the headers/properties/body was changed since last send
- // (which will prompt an invalidate(), which will cause a copy if not copied already)
- // Then the message needs to be copied before sending - the previous send may be in the Netty write queue
- // so we can't just use the same buffer. Also we can't just duplicate, since the extra data (requiresResponse)
- // may be different on different calls
- message.checkCopy();
}
public SessionSendMessage()
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-12-11 17:20:56 UTC (rev 8680)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-12-11 17:53:34 UTC (rev 8681)
@@ -51,7 +51,7 @@
static
{
- // This is an estimate of how much memory a ServerMessageImpl takes up, exclusing body and properties
+ // This is an estimate of how much memory a ServerMessageImpl takes up, excluding body and properties
// Note, it is only an estimate, it's not possible to be entirely sure with Java
// This figure is calculated using the test utilities in org.hornetq.tests.unit.util.sizeof
// The value is somewhat higher on 64 bit architectures, probably due to different alignment
[View Less]
15 years, 1 month
JBoss hornetq SVN: r8680 - in trunk/src/main/org/hornetq: integration/transports/netty and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-11 12:20:56 -0500 (Fri, 11 Dec 2009)
New Revision: 8680
Modified:
trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
Log:
some adjustments on buffer sizes
Modified: trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/…
[View More]core/server/management/impl/ManagementServiceImpl.java 2009-12-11 16:50:36 UTC (rev 8679)
+++ trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2009-12-11 17:20:56 UTC (rev 8680)
@@ -96,9 +96,8 @@
private static final Logger log = Logger.getLogger(ManagementServiceImpl.class);
private final MBeanServer mbeanServer;
+ private final boolean jmxManagementEnabled;
- private final boolean jmxManagementEnabled;
-
private final Map<String, Object> registry;
private final NotificationBroadcasterSupport broadcaster;
@@ -427,7 +426,7 @@
public ServerMessage handleMessage(final ServerMessage message) throws Exception
{
// a reply message is sent with the result stored in the message body.
- ServerMessage reply = new ServerMessageImpl(storageManager.generateUniqueID(), 512);
+ ServerMessage reply = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
String resourceName = message.getStringProperty(ManagementHelper.HDR_RESOURCE_NAME);
if (ManagementServiceImpl.log.isDebugEnabled())
@@ -719,7 +718,7 @@
long messageID = storageManager.generateUniqueID();
- ServerMessage notificationMessage = new ServerMessageImpl(messageID, 512);
+ ServerMessage notificationMessage = new ServerMessageImpl(messageID, 50);
// Notification messages are always durable so the user can choose whether to add a durable queue to
// consume
Modified: trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2009-12-11 16:50:36 UTC (rev 8679)
+++ trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2009-12-11 17:20:56 UTC (rev 8680)
@@ -13,6 +13,7 @@
package org.hornetq.integration.transports.netty;
+import org.hornetq.core.logging.Logger;
import org.hornetq.utils.DataConstants;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
@@ -33,6 +34,8 @@
@ChannelPipelineCoverage("one")
public class HornetQFrameDecoder2 extends SimpleChannelUpstreamHandler
{
+ private static final Logger log = Logger.getLogger(HornetQFrameDecoder2.class);
+
private ChannelBuffer previousData = ChannelBuffers.EMPTY_BUFFER;
// SimpleChannelUpstreamHandler overrides
@@ -46,8 +49,9 @@
{
if (previousData.readableBytes() + in.readableBytes() < DataConstants.SIZE_INT)
{
- // XXX Length is unknown. Bet at 512. Tune this value.
- append(in, 512);
+ // XXX Length is unknown. Bet at 100. Tune this value.
+ //In most cases this won't occur
+ append(in, 100);
return;
}
@@ -111,8 +115,8 @@
{
// XXX Tune this value: Increasing the initial capacity of the
// dynamic buffer might reduce the chance of additional memory
- // copy.
- frame = ChannelBuffers.dynamicBuffer(length + 4);
+ // copy.
+ frame = ChannelBuffers.dynamicBuffer(length + 4);
frame.writeBytes(previousData, previousData.readerIndex(), previousData.readableBytes());
frame.writeBytes(in, length + 4 - frame.writerIndex());
}
@@ -164,8 +168,8 @@
// Convert to dynamic buffer (this requires copy)
// XXX Tune this value: Increasing the initial capacity of the dynamic
- // buffer might reduce the chance of additional memory copy.
- ChannelBuffer frame = ChannelBuffers.dynamicBuffer(length + DataConstants.SIZE_INT);
+ // buffer might reduce the chance of additional memory copy.
+ ChannelBuffer frame = ChannelBuffers.dynamicBuffer(length + DataConstants.SIZE_INT);
frame.writeBytes(in, length + DataConstants.SIZE_INT);
frame.skipBytes(DataConstants.SIZE_INT);
Channels.fireMessageReceived(ctx, frame);
@@ -182,7 +186,7 @@
previousData.writeBytes(in);
}
else
- {
+ {
ChannelBuffer newPreviousData = ChannelBuffers.dynamicBuffer(Math.max(previousData.readableBytes() + in.readableBytes(),
length + 4));
newPreviousData.writeBytes(previousData);
[View Less]
15 years, 1 month
JBoss hornetq SVN: r8679 - in trunk/examples: core/embedded and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-11 11:50:36 -0500 (Fri, 11 Dec 2009)
New Revision: 8679
Modified:
trunk/examples/common/build.xml
trunk/examples/core/embedded/build.xml
Log:
fixed examples
* netty is a required dependency to HornetQ core
Modified: trunk/examples/common/build.xml
===================================================================
--- trunk/examples/common/build.xml 2009-12-11 16:40:21 UTC (rev 8678)
+++ trunk/examples/common/build.xml 2009-12-11 16:50:36 UTC (rev 8679)
@@ …
[View More]-72,6 +72,7 @@
<fileset dir="${hornetq.jars.dir}">
<include name="**/*client*.jar"/>
<include name="**/jboss-jms-api.jar"/>
+ <include name="**/netty.jar"/>
</fileset>
<fileset dir="${jars.dir}">
<include name="**/jboss-jms-api.jar"/>
Modified: trunk/examples/core/embedded/build.xml
===================================================================
--- trunk/examples/core/embedded/build.xml 2009-12-11 16:40:21 UTC (rev 8678)
+++ trunk/examples/core/embedded/build.xml 2009-12-11 16:50:36 UTC (rev 8679)
@@ -22,9 +22,12 @@
<!-- We use a minimal classpath on this example -->
<path id="local.classpath">
<fileset dir="${hornetq.jars.dir}">
- <include name="hornetq-core.jar"/>
- <include name="netty.jar"/>
+ <include name="hornetq-core.jar"/>
+ <include name="hornetq-transports.jar"/>
</fileset>
+ <fileset dir="${jars.dir}">
+ <include name="**/netty*jar"/>
+ </fileset>
<pathelement location="${classes.dir}"/>
</path>
[View Less]
15 years, 1 month
JBoss hornetq SVN: r8678 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-11 11:40:21 -0500 (Fri, 11 Dec 2009)
New Revision: 8678
Added:
trunk/tests/src/org/hornetq/tests/integration/client/MessageConcurrencyTest.java
Modified:
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
Log:
fixed concurrency issue when sending same message multiple threads
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/…
[View More]core/message/impl/MessageImpl.java 2009-12-11 16:08:20 UTC (rev 8677)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-12-11 16:40:21 UTC (rev 8678)
@@ -374,7 +374,7 @@
endOfBodyPosition = -1;
}
- public void checkCopy()
+ public synchronized void checkCopy()
{
if (!copied)
{
@@ -444,7 +444,7 @@
buffer.setIndex(0, endOfMessagePosition);
bufferUsed = true;
-
+
return buffer;
}
}
Added: trunk/tests/src/org/hornetq/tests/integration/client/MessageConcurrencyTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessageConcurrencyTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessageConcurrencyTest.java 2009-12-11 16:40:21 UTC (rev 8678)
@@ -0,0 +1,251 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.client;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.MessageHandler;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ *
+ * A MessageConcurrencyTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class MessageConcurrencyTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(ConsumerTest.class);
+
+ private HornetQServer server;
+
+ private final SimpleString ADDRESS = new SimpleString("MessageConcurrencyTestAddress");
+
+ private final SimpleString QUEUE_NAME = new SimpleString("MessageConcurrencyTestQueue");
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ server = createServer(false);
+
+ server.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+
+ server = null;
+
+ super.tearDown();
+ }
+
+ // Test that a created message can be sent via multiple producers on different sessions concurrently
+ public void testMessageConcurrency() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession createSession = sf.createSession();
+
+ Set<ClientSession> sendSessions = new HashSet<ClientSession>();
+
+ Set<Sender> senders = new HashSet<Sender>();
+
+ final int numSessions = 100;
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ ClientSession sendSession = sf.createSession();
+
+ sendSessions.add(sendSession);
+
+ ClientProducer producer = sendSession.createProducer(ADDRESS);
+
+ Sender sender = new Sender(numMessages, producer);
+
+ senders.add(sender);
+
+ sender.start();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ byte[] body = RandomUtil.randomBytes(1000);
+
+ ClientMessage message = createSession.createMessage(false);
+
+ message.getBodyBuffer().writeBytes(body);
+
+ for (Sender sender: senders)
+ {
+ sender.queue.add(message);
+ }
+ }
+
+ for (Sender sender: senders)
+ {
+ sender.join();
+
+ assertFalse(sender.failed);
+ }
+
+ for (ClientSession sendSession: sendSessions)
+ {
+ sendSession.close();
+ }
+
+ createSession.close();
+
+ sf.close();
+ }
+
+ // Test that a created message can be sent via multiple producers after being consumed from a single consumer
+ public void testMessageConcurrencyAfterConsumption() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession consumeSession = sf.createSession();
+
+ final ClientProducer mainProducer = consumeSession.createProducer(ADDRESS);
+
+ consumeSession.createQueue(ADDRESS, QUEUE_NAME);
+
+ ClientConsumer consumer = consumeSession.createConsumer(QUEUE_NAME);
+
+
+
+ consumeSession.start();
+
+ Set<ClientSession> sendSessions = new HashSet<ClientSession>();
+
+ final Set<Sender> senders = new HashSet<Sender>();
+
+ final int numSessions = 100;
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ ClientSession sendSession = sf.createSession();
+
+ sendSessions.add(sendSession);
+
+ ClientProducer producer = sendSession.createProducer(ADDRESS);
+
+ Sender sender = new Sender(numMessages, producer);
+
+ senders.add(sender);
+
+ sender.start();
+ }
+
+ consumer.setMessageHandler(new MessageHandler()
+ {
+ public void onMessage(ClientMessage message)
+ {
+ for (Sender sender: senders)
+ {
+ sender.queue.add(message);
+ }
+ }
+ });
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ byte[] body = RandomUtil.randomBytes(1000);
+
+ ClientMessage message = consumeSession.createMessage(false);
+
+ message.getBodyBuffer().writeBytes(body);
+
+ mainProducer.send(message);
+ }
+
+ for (Sender sender: senders)
+ {
+ sender.join();
+
+ assertFalse(sender.failed);
+ }
+
+ for (ClientSession sendSession: sendSessions)
+ {
+ sendSession.close();
+ }
+
+ consumer.close();
+
+ consumeSession.deleteQueue(QUEUE_NAME);
+
+ consumeSession.close();
+
+ sf.close();
+ }
+
+ private class Sender extends Thread
+ {
+ private final BlockingQueue<ClientMessage> queue = new LinkedBlockingQueue<ClientMessage>();
+
+ private final ClientProducer producer;
+
+ private final int numMessages;
+
+ Sender(final int numMessages, final ClientProducer producer)
+ {
+ this.numMessages = numMessages;
+
+ this.producer = producer;
+ }
+
+ volatile boolean failed;
+
+ public void run()
+ {
+ try
+ {
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = queue.take();
+
+ producer.send(msg);
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to send message", e);
+
+ failed = true;
+ }
+ }
+ }
+
+}
[View Less]
15 years, 1 month