[hornetq-commits] JBoss hornetq SVN: r11540 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Oct 14 12:54:26 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-10-14 12:54:26 -0400 (Fri, 14 Oct 2011)
New Revision: 11540
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
JBPAPP-7389 - flow control on large messages
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2011-10-14 14:36:55 UTC (rev 11539)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2011-10-14 16:54:26 UTC (rev 11540)
@@ -37,6 +37,7 @@
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.impl.ServerConsumerImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
/**
@@ -911,6 +912,123 @@
internalTestSlowConsumerOnMessageHandlerNoBuffers(true);
}
+ public void testFlowControl() throws Exception
+ {
+ internalTestFlowControlOnRollback(false);
+ }
+
+ public void testFlowControlLargeMessage() throws Exception
+ {
+ internalTestFlowControlOnRollback(true);
+ }
+
+ private void internalTestFlowControlOnRollback(final boolean isLargeMessage) throws Exception
+ {
+
+ HornetQServer server = createServer(false, isNetty());
+
+ AddressSettings settings = new AddressSettings();
+ settings.setMaxDeliveryAttempts(-1);
+ server.getAddressSettingsRepository().addMatch("#", settings);
+
+ ClientSession session = null;
+
+ try
+ {
+ final int numberOfMessages = 100;
+
+ server.start();
+
+ locator.setConsumerWindowSize(300000);
+
+ if (isLargeMessage)
+ {
+ // something to ensure we are using large messages
+ locator.setMinLargeMessageSize(100);
+ }
+ else
+ {
+ // To make sure large messages won't kick in, we set anything large
+ locator.setMinLargeMessageSize(Integer.MAX_VALUE);
+ }
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ SimpleString ADDRESS = new SimpleString("some-queue");
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty("count", i);
+ msg.getBodyBuffer().writeBytes(new byte[1024]);
+ producer.send(msg);
+ }
+
+ session.commit();
+
+ ClientConsumerInternal consumer = (ClientConsumerInternal)session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int repeat = 0; repeat < 100; repeat ++)
+ {
+ System.out.println("Repeat " + repeat);
+ long timeout = System.currentTimeMillis() + 2000;
+ // At least 10 messages on the buffer
+ while (timeout > System.currentTimeMillis() && consumer.getBufferSize() <= 10)
+ {
+ Thread.sleep(10);
+ }
+ assertTrue(consumer.getBufferSize() >= 10);
+
+ ClientMessage msg = consumer.receive(500);
+ msg.getBodyBuffer().readByte();
+ assertNotNull(msg);
+ msg.acknowledge();
+ session.rollback();
+ }
+
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ System.out.println("msg " + msg);
+ msg.getBodyBuffer().readByte();
+ msg.acknowledge();
+ session.commit();
+ }
+
+ }
+ finally
+ {
+ try
+ {
+ if (session != null)
+ {
+ session.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
+ }
+
+
+
public void internalTestSlowConsumerOnMessageHandlerNoBuffers(final boolean largeMessages) throws Exception
{
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-10-14 14:36:55 UTC (rev 11539)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-10-14 16:54:26 UTC (rev 11540)
@@ -408,6 +408,7 @@
{
Message clientFile = createLargeClientMessage(session, messageSize, true);
clientFile.putIntProperty("txid", 2);
+ clientFile.putIntProperty("i", i);
producer.send(clientFile);
}
session.end(xid2, XAResource.TMSUCCESS);
@@ -422,6 +423,7 @@
for (int start = 0 ; start < 2; start++)
{
+ System.out.println("Start " + start);
sf = locator.createSessionFactory();
@@ -437,6 +439,7 @@
session.start();
for (int i = 0 ; i < 10; i++)
{
+ log.info("I = " + i);
ClientMessage msg = cons1.receive(5000);
assertNotNull(msg);
assertEquals(1, msg.getIntProperty("txid").intValue());
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-10-14 14:36:55 UTC (rev 11539)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-10-14 16:54:26 UTC (rev 11540)
@@ -15,6 +15,9 @@
import java.io.ByteArrayOutputStream;
import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -70,7 +73,136 @@
{
return false;
}
+
+ public void testRollbackPartiallyConsumedBuffer() throws Exception
+ {
+ for (int i = 0 ; i < 1; i++)
+ {
+ log.info("#test " + i);
+ internalTestRollbackPartiallyConsumedBuffer(false);
+ tearDown();
+ setUp();
+
+ }
+
+ }
+
+ public void testRollbackPartiallyConsumedBufferWithRedeliveryDelay() throws Exception
+ {
+ internalTestRollbackPartiallyConsumedBuffer(true);
+ }
+
+
+ private void internalTestRollbackPartiallyConsumedBuffer(final boolean redeliveryDelay) throws Exception
+ {
+ final int messageSize = 100 * 1024;
+
+ final ClientSession session;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ AddressSettings settings = new AddressSettings();
+ if (redeliveryDelay)
+ {
+ settings.setRedeliveryDelay(1000);
+ if (locator.isCompressLargeMessage())
+ {
+ locator.setConsumerWindowSize(0);
+ }
+ }
+ settings.setMaxDeliveryAttempts(-1);
+
+ server.getAddressSettingsRepository().addMatch("#", settings);
+
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ for (int i = 0 ; i < 20; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ clientFile.putIntProperty("value", i);
+
+ producer.send(clientFile);
+ }
+
+ session.commit();
+
+ session.start();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+
+ consumer.setMessageHandler(new MessageHandler()
+ {
+ int counter = 0;
+ public void onMessage(ClientMessage message)
+ {
+ message.getBodyBuffer().readByte();
+ System.out.println("message:" + message);
+ try
+ {
+ if (counter ++ < 20)
+ {
+ Thread.sleep(100);
+ System.out.println("Rollback");
+ message.acknowledge();
+ session.rollback();
+ }
+ else
+ {
+ message.acknowledge();
+ session.commit();
+ }
+
+ if (counter == 40)
+ {
+ latch.countDown();
+ }
+ }
+ catch (Exception e)
+ {
+ latch.countDown();
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ });
+
+ assertTrue(latch.await(40, TimeUnit.SECONDS));
+
+ consumer.close();
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testCloseConsumer() throws Exception
{
final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -124,7 +256,7 @@
{
try
{
- server.stop();
+ session.close();
}
catch (Throwable ignored)
{
@@ -132,7 +264,7 @@
try
{
- session.close();
+ server.stop();
}
catch (Throwable ignored)
{
@@ -500,16 +632,17 @@
ClientConsumer consumerExpiry = session.createConsumer(ADDRESS_EXPIRY);
ClientMessage msg1 = consumerExpiry.receive(5000);
+ assertTrue(msg1.isLargeMessage());
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
for (int i = 0; i < 10; i++)
@@ -521,13 +654,13 @@
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
}
@@ -638,13 +771,13 @@
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
for (int i = 0; i < 10; i++)
@@ -655,13 +788,13 @@
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
}
@@ -1892,6 +2025,7 @@
ClientConsumer consumer = session.createConsumer(queue[1]);
ClientMessage msg = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+ msg.getBodyBuffer().readByte();
Assert.assertNull(consumer.receiveImmediate());
Assert.assertNotNull(msg);
More information about the hornetq-commits
mailing list