[jboss-cvs] JBoss Messaging SVN: r6208 - in trunk: tests/src/org/jboss/messaging/tests/integration/client and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Mar 27 20:41:23 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-03-27 20:41:23 -0400 (Fri, 27 Mar 2009)
New Revision: 6208
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageRateTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
Log:
MessageRateTest and fixes on ConsumerRate
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-03-27 23:08:59 UTC (rev 6207)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-03-28 00:41:23 UTC (rev 6208)
@@ -25,6 +25,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.utils.Future;
+import org.jboss.messaging.utils.TokenBucketLimiter;
import java.io.File;
import java.io.IOException;
@@ -75,6 +76,8 @@
private final File directory;
private ClientMessageInternal currentChunkMessage;
+
+ private final TokenBucketLimiter rateLimiter;
private volatile Thread receiverThread;
@@ -103,6 +106,7 @@
final long id,
final int clientWindowSize,
final int ackBatchSize,
+ final TokenBucketLimiter rateLimiter,
final Executor executor,
final Channel channel,
final File directory)
@@ -112,6 +116,8 @@
this.channel = channel;
this.session = session;
+
+ this.rateLimiter = rateLimiter;
sessionExecutor = executor;
@@ -128,7 +134,13 @@
public ClientMessage receive(long timeout) throws MessagingException
{
checkClosed();
+
+ if (rateLimiter != null)
+ {
+ rateLimiter.limit();
+ }
+
if (handler != null)
{
throw new MessagingException(MessagingException.ILLEGAL_STATE,
@@ -175,7 +187,7 @@
catch (InterruptedException e)
{
}
-
+
if (m != null || closed)
{
break;
@@ -603,6 +615,12 @@
if (theHandler != null)
{
+
+ if (rateLimiter != null)
+ {
+ rateLimiter.limit();
+ }
+
synchronized (this)
{
message = buffer.poll();
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-03-27 23:08:59 UTC (rev 6207)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-03-28 00:41:23 UTC (rev 6208)
@@ -87,6 +87,7 @@
import org.jboss.messaging.utils.OrderedExecutorFactory;
import org.jboss.messaging.utils.SimpleIDGenerator;
import org.jboss.messaging.utils.SimpleString;
+import org.jboss.messaging.utils.TokenBucketLimiter;
import org.jboss.messaging.utils.TokenBucketLimiterImpl;
/*
@@ -357,7 +358,7 @@
final int maxRate,
final boolean browseOnly) throws MessagingException
{
- return internalCreateConsumer(queueName, filterString, windowSize, browseOnly, null);
+ return internalCreateConsumer(queueName, filterString, windowSize, maxRate, browseOnly, null);
}
public ClientConsumer createConsumer(final String queueName,
@@ -422,7 +423,7 @@
final int maxRate,
final boolean browseOnly) throws MessagingException
{
- return internalCreateConsumer(queueName, filterString, windowSize, browseOnly, directory);
+ return internalCreateConsumer(queueName, filterString, windowSize, maxRate, browseOnly, directory);
}
public ClientConsumer createFileConsumer(final File directory,
@@ -1198,6 +1199,7 @@
private ClientConsumer internalCreateConsumer(final SimpleString queueName,
final SimpleString filterString,
final int windowSize,
+ final int maxRate,
final boolean browseOnly,
final File directory) throws MessagingException
{
@@ -1244,6 +1246,7 @@
consumerID,
clientWindowSize,
ackBatchSize,
+ consumerMaxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null,
executor,
channel,
directory);
Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageRateTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageRateTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageRateTest.java 2009-03-28 00:41:23 UTC (rev 6208)
@@ -0,0 +1,232 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.client;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.client.*;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A MessageRateTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class MessageRateTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ private final SimpleString ADDRESS = new SimpleString("ADDRESS");
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testProduceRate() throws Exception
+ {
+ MessagingService service = createService(false);
+
+ try
+ {
+ service.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+ sf.setProducerMaxRate(10);
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < 10; i++)
+ {
+ producer.send(session.createClientMessage(false));
+ }
+ long end = System.currentTimeMillis();
+
+ assertTrue("TotalTime = " + (end - start), end - start >= 1000);
+
+
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+
+ }
+
+
+ public void testConsumeRate() throws Exception
+ {
+ MessagingService service = createService(false);
+
+ try
+ {
+ service.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+ sf.setConsumerMaxRate(10);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ for (int i = 0; i < 12; i++)
+ {
+ producer.send(session.createClientMessage(false));
+ }
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < 12; i++)
+ {
+ consumer.receive(1000);
+ }
+
+ long end = System.currentTimeMillis();
+
+ assertTrue("TotalTime = " + (end - start), end - start >= 1000);
+
+
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+
+ }
+
+
+ public void testConsumeRateListener() throws Exception
+ {
+ MessagingService service = createService(false);
+
+ try
+ {
+ service.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+ sf.setConsumerMaxRate(10);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ for (int i = 0; i < 12; i++)
+ {
+ producer.send(session.createClientMessage(false));
+ }
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ final AtomicInteger failures = new AtomicInteger(0);
+
+ final CountDownLatch messages = new CountDownLatch(12);
+
+ consumer.setMessageHandler(new MessageHandler()
+ {
+
+ public void onMessage(ClientMessage message)
+ {
+ try
+ {
+ message.acknowledge();
+ messages.countDown();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); // Hudson report
+ failures.incrementAndGet();
+ }
+ }
+
+ });
+
+
+ long start = System.currentTimeMillis();
+ session.start();
+ assertTrue(messages.await(5, TimeUnit.SECONDS));
+ long end = System.currentTimeMillis();
+
+ assertTrue("TotalTime = " + (end - start), end - start >= 1000);
+
+
+ }
+ finally
+ {
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Property changes on: trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageRateTest.java
___________________________________________________________________
Name: svn:mergeinfo
+
More information about the jboss-cvs-commits
mailing list