[jboss-cvs] JBoss Messaging SVN: r6208 - in trunk: tests/src/org/jboss/messaging/tests/integration/client and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Mar 27 20:41:23 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-03-27 20:41:23 -0400 (Fri, 27 Mar 2009)
New Revision: 6208

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageRateTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
Log:
MessageRateTest and fixes on ConsumerRate

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-03-27 23:08:59 UTC (rev 6207)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-03-28 00:41:23 UTC (rev 6208)
@@ -25,6 +25,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.utils.Future;
+import org.jboss.messaging.utils.TokenBucketLimiter;
 
 import java.io.File;
 import java.io.IOException;
@@ -75,6 +76,8 @@
    private final File directory;
 
    private ClientMessageInternal currentChunkMessage;
+   
+   private final TokenBucketLimiter rateLimiter;
 
    private volatile Thread receiverThread;
 
@@ -103,6 +106,7 @@
                              final long id,
                              final int clientWindowSize,
                              final int ackBatchSize,
+                             final TokenBucketLimiter rateLimiter,
                              final Executor executor,
                              final Channel channel,
                              final File directory)
@@ -112,6 +116,8 @@
       this.channel = channel;
 
       this.session = session;
+      
+      this.rateLimiter = rateLimiter;
 
       sessionExecutor = executor;
 
@@ -128,7 +134,13 @@
    public ClientMessage receive(long timeout) throws MessagingException
    {
       checkClosed();
+      
+      if (rateLimiter != null)
+      {
+         rateLimiter.limit();
+      }
 
+
       if (handler != null)
       {
          throw new MessagingException(MessagingException.ILLEGAL_STATE,
@@ -175,7 +187,7 @@
                   catch (InterruptedException e)
                   {
                   }
-
+                  
                   if (m != null || closed)
                   {
                      break;
@@ -603,6 +615,12 @@
 
       if (theHandler != null)
       {
+         
+         if (rateLimiter != null)
+         {
+            rateLimiter.limit();
+         }
+
          synchronized (this)
          {
             message = buffer.poll();

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-03-27 23:08:59 UTC (rev 6207)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-03-28 00:41:23 UTC (rev 6208)
@@ -87,6 +87,7 @@
 import org.jboss.messaging.utils.OrderedExecutorFactory;
 import org.jboss.messaging.utils.SimpleIDGenerator;
 import org.jboss.messaging.utils.SimpleString;
+import org.jboss.messaging.utils.TokenBucketLimiter;
 import org.jboss.messaging.utils.TokenBucketLimiterImpl;
 
 /*
@@ -357,7 +358,7 @@
                                         final int maxRate,
                                         final boolean browseOnly) throws MessagingException
    {
-      return internalCreateConsumer(queueName, filterString, windowSize, browseOnly, null);
+      return internalCreateConsumer(queueName, filterString, windowSize, maxRate, browseOnly, null);
    }
 
    public ClientConsumer createConsumer(final String queueName,
@@ -422,7 +423,7 @@
                                             final int maxRate,
                                             final boolean browseOnly) throws MessagingException
    {
-      return internalCreateConsumer(queueName, filterString, windowSize, browseOnly, directory);
+      return internalCreateConsumer(queueName, filterString, windowSize, maxRate, browseOnly, directory);
    }
 
    public ClientConsumer createFileConsumer(final File directory,
@@ -1198,6 +1199,7 @@
    private ClientConsumer internalCreateConsumer(final SimpleString queueName,
                                                  final SimpleString filterString,
                                                  final int windowSize,
+                                                 final int maxRate,
                                                  final boolean browseOnly,
                                                  final File directory) throws MessagingException
    {
@@ -1244,6 +1246,7 @@
                                                                consumerID,
                                                                clientWindowSize,
                                                                ackBatchSize,
+                                                               consumerMaxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, 
                                                                executor,
                                                                channel,
                                                                directory);

Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageRateTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageRateTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageRateTest.java	2009-03-28 00:41:23 UTC (rev 6208)
@@ -0,0 +1,232 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.client;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.client.*;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A MessageRateTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class MessageRateTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   private final SimpleString ADDRESS = new SimpleString("ADDRESS");
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testProduceRate() throws Exception
+   {
+      MessagingService service = createService(false);
+
+      try
+      {
+         service.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+         sf.setProducerMaxRate(10);
+         ClientSession session = sf.createSession(false, true, true);
+         
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+         long start = System.currentTimeMillis();
+         for (int i = 0; i < 10; i++)
+         {
+            producer.send(session.createClientMessage(false));
+         }
+         long end = System.currentTimeMillis();
+
+         assertTrue("TotalTime = " + (end - start), end - start >= 1000);
+         
+
+      }
+      finally
+      {
+         if (service.isStarted())
+         {
+            service.stop();
+         }
+      }
+
+   }
+
+
+   public void testConsumeRate() throws Exception
+   {
+      MessagingService service = createService(false);
+
+      try
+      {
+         service.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+         sf.setConsumerMaxRate(10);
+         
+         ClientSession session = sf.createSession(false, true, true);
+         
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+         
+         for (int i = 0; i < 12; i++)
+         {
+            producer.send(session.createClientMessage(false));
+         }
+
+         session.start();
+         
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+         long start = System.currentTimeMillis();
+         
+         for (int i = 0; i < 12; i++)
+         {
+            consumer.receive(1000);
+         }
+         
+         long end = System.currentTimeMillis();
+
+         assertTrue("TotalTime = " + (end - start), end - start >= 1000);
+         
+
+      }
+      finally
+      {
+         if (service.isStarted())
+         {
+            service.stop();
+         }
+      }
+
+   }
+
+
+   public void testConsumeRateListener() throws Exception
+   {
+      MessagingService service = createService(false);
+
+      try
+      {
+         service.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+         sf.setConsumerMaxRate(10);
+         
+         ClientSession session = sf.createSession(false, true, true);
+         
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+         
+         for (int i = 0; i < 12; i++)
+         {
+            producer.send(session.createClientMessage(false));
+         }
+         
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+         
+         final AtomicInteger failures = new AtomicInteger(0);
+         
+         final CountDownLatch messages = new CountDownLatch(12);
+         
+         consumer.setMessageHandler(new MessageHandler()
+         {
+
+            public void onMessage(ClientMessage message)
+            {
+               try
+               {
+                  message.acknowledge();
+                  messages.countDown();
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace(); // Hudson report
+                  failures.incrementAndGet();
+               }
+            }
+            
+         });
+
+         
+         long start = System.currentTimeMillis();
+         session.start();
+         assertTrue(messages.await(5, TimeUnit.SECONDS));
+         long end = System.currentTimeMillis();
+         
+         assertTrue("TotalTime = " + (end - start), end - start >= 1000);
+         
+
+      }
+      finally
+      {
+         if (service.isStarted())
+         {
+            service.stop();
+         }
+      }
+
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}


Property changes on: trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageRateTest.java
___________________________________________________________________
Name: svn:mergeinfo
   + 




More information about the jboss-cvs-commits mailing list