[hornetq-commits] JBoss hornetq SVN: r8678 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Dec 11 11:40:22 EST 2009
Author: timfox
Date: 2009-12-11 11:40:21 -0500 (Fri, 11 Dec 2009)
New Revision: 8678
Added:
trunk/tests/src/org/hornetq/tests/integration/client/MessageConcurrencyTest.java
Modified:
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
Log:
fixed concurrency issue when sending same message multiple threads
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-12-11 16:08:20 UTC (rev 8677)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-12-11 16:40:21 UTC (rev 8678)
@@ -374,7 +374,7 @@
endOfBodyPosition = -1;
}
- public void checkCopy()
+ public synchronized void checkCopy()
{
if (!copied)
{
@@ -444,7 +444,7 @@
buffer.setIndex(0, endOfMessagePosition);
bufferUsed = true;
-
+
return buffer;
}
}
Added: trunk/tests/src/org/hornetq/tests/integration/client/MessageConcurrencyTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessageConcurrencyTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessageConcurrencyTest.java 2009-12-11 16:40:21 UTC (rev 8678)
@@ -0,0 +1,251 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.client;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.MessageHandler;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ *
+ * A MessageConcurrencyTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class MessageConcurrencyTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(ConsumerTest.class);
+
+ private HornetQServer server;
+
+ private final SimpleString ADDRESS = new SimpleString("MessageConcurrencyTestAddress");
+
+ private final SimpleString QUEUE_NAME = new SimpleString("MessageConcurrencyTestQueue");
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ server = createServer(false);
+
+ server.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+
+ server = null;
+
+ super.tearDown();
+ }
+
+ // Test that a created message can be sent via multiple producers on different sessions concurrently
+ public void testMessageConcurrency() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession createSession = sf.createSession();
+
+ Set<ClientSession> sendSessions = new HashSet<ClientSession>();
+
+ Set<Sender> senders = new HashSet<Sender>();
+
+ final int numSessions = 100;
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ ClientSession sendSession = sf.createSession();
+
+ sendSessions.add(sendSession);
+
+ ClientProducer producer = sendSession.createProducer(ADDRESS);
+
+ Sender sender = new Sender(numMessages, producer);
+
+ senders.add(sender);
+
+ sender.start();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ byte[] body = RandomUtil.randomBytes(1000);
+
+ ClientMessage message = createSession.createMessage(false);
+
+ message.getBodyBuffer().writeBytes(body);
+
+ for (Sender sender: senders)
+ {
+ sender.queue.add(message);
+ }
+ }
+
+ for (Sender sender: senders)
+ {
+ sender.join();
+
+ assertFalse(sender.failed);
+ }
+
+ for (ClientSession sendSession: sendSessions)
+ {
+ sendSession.close();
+ }
+
+ createSession.close();
+
+ sf.close();
+ }
+
+ // Test that a created message can be sent via multiple producers after being consumed from a single consumer
+ public void testMessageConcurrencyAfterConsumption() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession consumeSession = sf.createSession();
+
+ final ClientProducer mainProducer = consumeSession.createProducer(ADDRESS);
+
+ consumeSession.createQueue(ADDRESS, QUEUE_NAME);
+
+ ClientConsumer consumer = consumeSession.createConsumer(QUEUE_NAME);
+
+
+
+ consumeSession.start();
+
+ Set<ClientSession> sendSessions = new HashSet<ClientSession>();
+
+ final Set<Sender> senders = new HashSet<Sender>();
+
+ final int numSessions = 100;
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ ClientSession sendSession = sf.createSession();
+
+ sendSessions.add(sendSession);
+
+ ClientProducer producer = sendSession.createProducer(ADDRESS);
+
+ Sender sender = new Sender(numMessages, producer);
+
+ senders.add(sender);
+
+ sender.start();
+ }
+
+ consumer.setMessageHandler(new MessageHandler()
+ {
+ public void onMessage(ClientMessage message)
+ {
+ for (Sender sender: senders)
+ {
+ sender.queue.add(message);
+ }
+ }
+ });
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ byte[] body = RandomUtil.randomBytes(1000);
+
+ ClientMessage message = consumeSession.createMessage(false);
+
+ message.getBodyBuffer().writeBytes(body);
+
+ mainProducer.send(message);
+ }
+
+ for (Sender sender: senders)
+ {
+ sender.join();
+
+ assertFalse(sender.failed);
+ }
+
+ for (ClientSession sendSession: sendSessions)
+ {
+ sendSession.close();
+ }
+
+ consumer.close();
+
+ consumeSession.deleteQueue(QUEUE_NAME);
+
+ consumeSession.close();
+
+ sf.close();
+ }
+
+ private class Sender extends Thread
+ {
+ private final BlockingQueue<ClientMessage> queue = new LinkedBlockingQueue<ClientMessage>();
+
+ private final ClientProducer producer;
+
+ private final int numMessages;
+
+ Sender(final int numMessages, final ClientProducer producer)
+ {
+ this.numMessages = numMessages;
+
+ this.producer = producer;
+ }
+
+ volatile boolean failed;
+
+ public void run()
+ {
+ try
+ {
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = queue.take();
+
+ producer.send(msg);
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to send message", e);
+
+ failed = true;
+ }
+ }
+ }
+
+}
More information about the hornetq-commits
mailing list