[hornetq-commits] JBoss hornetq SVN: r8678 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Dec 11 11:40:22 EST 2009


Author: timfox
Date: 2009-12-11 11:40:21 -0500 (Fri, 11 Dec 2009)
New Revision: 8678

Added:
   trunk/tests/src/org/hornetq/tests/integration/client/MessageConcurrencyTest.java
Modified:
   trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
Log:
fixed concurrency issue when sending same message multiple threads

Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-12-11 16:08:20 UTC (rev 8677)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-12-11 16:40:21 UTC (rev 8678)
@@ -374,7 +374,7 @@
       endOfBodyPosition = -1;
    }
 
-   public void checkCopy()
+   public synchronized void checkCopy()
    {
       if (!copied)
       {
@@ -444,7 +444,7 @@
          buffer.setIndex(0, endOfMessagePosition);
 
          bufferUsed = true;
-
+         
          return buffer;
       }
    }

Added: trunk/tests/src/org/hornetq/tests/integration/client/MessageConcurrencyTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessageConcurrencyTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessageConcurrencyTest.java	2009-12-11 16:40:21 UTC (rev 8678)
@@ -0,0 +1,251 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.client;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.MessageHandler;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * 
+ * A MessageConcurrencyTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class MessageConcurrencyTest extends ServiceTestBase
+{
+   private static final Logger log = Logger.getLogger(ConsumerTest.class);
+
+   private HornetQServer server;
+
+   private final SimpleString ADDRESS = new SimpleString("MessageConcurrencyTestAddress");
+   
+   private final SimpleString QUEUE_NAME = new SimpleString("MessageConcurrencyTestQueue");
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      server = createServer(false);
+
+      server.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      server.stop();
+
+      server = null;
+
+      super.tearDown();
+   }
+
+   // Test that a created message can be sent via multiple producers on different sessions concurrently
+   public void testMessageConcurrency() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+
+      ClientSession createSession = sf.createSession();
+      
+      Set<ClientSession> sendSessions = new HashSet<ClientSession>();
+
+      Set<Sender> senders = new HashSet<Sender>();
+
+      final int numSessions = 100;
+      
+      final int numMessages = 1000;
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         ClientSession sendSession = sf.createSession();
+         
+         sendSessions.add(sendSession);
+
+         ClientProducer producer = sendSession.createProducer(ADDRESS);
+         
+         Sender sender = new Sender(numMessages, producer);
+         
+         senders.add(sender);
+         
+         sender.start();
+      }
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         byte[] body = RandomUtil.randomBytes(1000);
+
+         ClientMessage message = createSession.createMessage(false);
+         
+         message.getBodyBuffer().writeBytes(body);
+         
+         for (Sender sender: senders)
+         {
+            sender.queue.add(message);
+         }
+      }
+
+      for (Sender sender: senders)
+      {
+         sender.join();
+         
+         assertFalse(sender.failed);
+      }
+      
+      for (ClientSession sendSession: sendSessions)
+      {
+         sendSession.close();
+      }
+       
+      createSession.close();
+
+      sf.close();      
+   }
+   
+   // Test that a created message can be sent via multiple producers after being consumed from a single consumer
+   public void testMessageConcurrencyAfterConsumption() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+
+      ClientSession consumeSession = sf.createSession();
+      
+      final ClientProducer mainProducer = consumeSession.createProducer(ADDRESS);
+      
+      consumeSession.createQueue(ADDRESS, QUEUE_NAME);
+                  
+      ClientConsumer consumer = consumeSession.createConsumer(QUEUE_NAME);
+      
+      
+      
+      consumeSession.start();
+
+      Set<ClientSession> sendSessions = new HashSet<ClientSession>();
+
+      final Set<Sender> senders = new HashSet<Sender>();
+
+      final int numSessions = 100;
+      
+      final int numMessages = 1000;
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         ClientSession sendSession = sf.createSession();
+         
+         sendSessions.add(sendSession);
+
+         ClientProducer producer = sendSession.createProducer(ADDRESS);
+         
+         Sender sender = new Sender(numMessages, producer);
+         
+         senders.add(sender);
+         
+         sender.start();
+      }
+      
+      consumer.setMessageHandler(new MessageHandler()
+      {
+         public void onMessage(ClientMessage message)
+         {
+            for (Sender sender: senders)
+            {
+               sender.queue.add(message);
+            }
+         }
+      });
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         byte[] body = RandomUtil.randomBytes(1000);
+
+         ClientMessage message = consumeSession.createMessage(false);
+         
+         message.getBodyBuffer().writeBytes(body);
+         
+         mainProducer.send(message);
+      }
+
+      for (Sender sender: senders)
+      {
+         sender.join();
+         
+         assertFalse(sender.failed);
+      }
+      
+      for (ClientSession sendSession: sendSessions)
+      {
+         sendSession.close();
+      }
+      
+      consumer.close();
+      
+      consumeSession.deleteQueue(QUEUE_NAME);
+      
+      consumeSession.close();
+
+      sf.close();      
+   }
+   
+   private class Sender extends Thread
+   {
+      private final BlockingQueue<ClientMessage> queue = new LinkedBlockingQueue<ClientMessage>();
+
+      private final ClientProducer producer;
+      
+      private final int numMessages;
+
+      Sender(final int numMessages, final ClientProducer producer)
+      {
+         this.numMessages = numMessages;
+         
+         this.producer = producer;
+      }
+      
+      volatile boolean failed;
+
+      public void run()
+      {
+         try
+         {
+            for (int i = 0; i < numMessages; i++)
+            {
+               ClientMessage msg = queue.take();
+
+               producer.send(msg);
+            }
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to send message", e);
+
+            failed = true;
+         }
+      }
+   }
+
+}



More information about the hornetq-commits mailing list