Author: clebert.suconic(a)jboss.com
Date: 2009-11-03 17:03:58 -0500 (Tue, 03 Nov 2009)
New Revision: 8200
Added:
trunk/tests/src/org/hornetq/tests/integration/client/NettyConsumerWindowSizeTest.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-202 - Fixing flow control on LargeMessage when
using Netty
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-03
18:27:35 UTC (rev 8199)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-03
22:03:58 UTC (rev 8200)
@@ -480,7 +480,8 @@
}
// Flow control for the first packet, we will have others
- flowControl(packet.getPacketSize(), false);
+ // It's using the RequiredBufferSize as the getSize() could be different
between transports
+ flowControl(packet.getRequiredBufferSize(), false);
ClientMessageInternal currentChunkMessage = new
ClientMessageImpl(packet.getDeliveryCount());
@@ -512,7 +513,6 @@
{
return;
}
-
currentLargeMessageBuffer.addPacket(chunk);
}
Modified: trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2009-11-03
18:27:35 UTC (rev 8199)
+++ trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2009-11-03
22:03:58 UTC (rev 8200)
@@ -162,7 +162,7 @@
outStream.write(packet.getBody());
- flowControlCredit = packet.getPacketSize();
+ flowControlCredit = packet.getRequiredBufferSize();
continues = packet.isContinues();
notifyAll();
@@ -248,7 +248,7 @@
{
break;
}
- totalFlowControl += packet.getPacketSize();
+ totalFlowControl += packet.getRequiredBufferSize();
continues = packet.isContinues();
sendPacketToOutput(output, packet);
}
@@ -1239,7 +1239,7 @@
throw new IndexOutOfBoundsException();
}
- consumerInternal.flowControl(currentPacket.getPacketSize(),
!currentPacket.isContinues());
+ consumerInternal.flowControl(currentPacket.getRequiredBufferSize(),
!currentPacket.isContinues());
packetPosition += sizeToAdd;
Modified:
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-03
18:27:35 UTC (rev 8199)
+++
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-03
22:03:58 UTC (rev 8200)
@@ -31,12 +31,6 @@
{
// Constants -----------------------------------------------------
- public static final int SESSION_RECEIVE_MESSAGE_LARGE_MESSAGE_SIZE = BASIC_PACKET_SIZE
+ DataConstants.SIZE_LONG +
-
DataConstants.SIZE_LONG +
-
DataConstants.SIZE_INT +
-
DataConstants.SIZE_BOOLEAN +
-
DataConstants.SIZE_INT;
-
private static final Logger log = Logger.getLogger(SessionReceiveMessage.class);
// Attributes ----------------------------------------------------
@@ -142,14 +136,30 @@
{
if (largeMessage)
{
- return SESSION_RECEIVE_MESSAGE_LARGE_MESSAGE_SIZE + largeMessageHeader.length;
+ return BASIC_PACKET_SIZE +
+ // consumerID
+ DataConstants.SIZE_LONG +
+ // deliveryCount
+ DataConstants.SIZE_INT +
+ // largeMessage (boolean)
+ DataConstants.SIZE_BOOLEAN +
+ // LargeMessageSize (Long)
+ DataConstants.SIZE_LONG +
+ // largeMessageHeader.length (int)
+ DataConstants.SIZE_INT +
+ // ByteArray size
+ largeMessageHeader.length;
}
else
{
return BASIC_PACKET_SIZE +
+ // consumerID
DataConstants.SIZE_LONG +
+ // deliveryCount
DataConstants.SIZE_INT +
+ // isLargeMessage
DataConstants.SIZE_BOOLEAN +
+ // message.encoding
(serverMessage != null ? serverMessage.getEncodeSize() :
clientMessage.getEncodeSize());
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-03
18:27:35 UTC (rev 8199)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-03
22:03:58 UTC (rev 8200)
@@ -331,7 +331,7 @@
if (trace)
{
- log.trace("Received " + credits +
+ trace("Received " + credits +
" credits, previous value = " +
previous +
" currentValue = " +
@@ -695,8 +695,8 @@
if (availableCredits != null)
{
// Flow control needs to be done in advance.
-
- //Again WHY? Is this necessary now we don't replicate sessions?
+ // If we take out credits as we send, the client would be sending credits
back as we are delivering
+ // as a result we would fire up a lot of packets over using the channel.
precalculateAvailableCredits = preCalculateFlowControl(initialMessage);
}
else
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2009-11-03
18:27:35 UTC (rev 8199)
+++
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2009-11-03
22:03:58 UTC (rev 8200)
@@ -49,9 +49,14 @@
private static final boolean isTrace = log.isTraceEnabled();
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
private int getMessageEncodeSize(final SimpleString address) throws Exception
{
- ClientSessionFactory cf = createInVMFactory();
+ ClientSessionFactory cf = createFactory(isNetty());
ClientSession session = cf.createSession(false, true, true);
ClientMessage message = session.createClientMessage(false);
// we need to set the destination so we can calculate the encodesize correctly
@@ -69,8 +74,8 @@
* */
public void testSendWindowSize() throws Exception
{
- HornetQServer messagingService = createServer(false);
- ClientSessionFactory cf = createInVMFactory();
+ HornetQServer messagingService = createServer(false, isNetty());
+ ClientSessionFactory cf = createFactory(isNetty());
try
{
messagingService.start();
@@ -124,7 +129,7 @@
public void testSlowConsumerBufferingOne() throws Exception
{
- HornetQServer server = createServer(false);
+ HornetQServer server = createServer(false, isNetty());
ClientSession sessionB = null;
ClientSession session = null;
@@ -135,7 +140,7 @@
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
sf.setConsumerWindowSize(1);
session = sf.createSession(false, true, true);
@@ -216,7 +221,7 @@
private void internalTestSlowConsumerNoBuffer(final boolean largeMessages) throws
Exception
{
- HornetQServer server = createServer(false);
+ HornetQServer server = createServer(false, isNetty());
ClientSession sessionB = null;
ClientSession session = null;
@@ -227,7 +232,7 @@
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
sf.setConsumerWindowSize(0);
if (largeMessages)
@@ -346,7 +351,7 @@
private void internalTestSlowConsumerNoBuffer2(final boolean largeMessages) throws
Exception
{
- HornetQServer server = createServer(false);
+ HornetQServer server = createServer(false, isNetty());
ClientSession session1 = null;
ClientSession session2 = null;
@@ -357,7 +362,7 @@
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
sf.setConsumerWindowSize(0);
@@ -529,7 +534,7 @@
public void internalTestSlowConsumerOnMessageHandlerNoBuffers(final boolean
largeMessages) throws Exception
{
- HornetQServer server = createServer(false);
+ HornetQServer server = createServer(false, isNetty());
ClientSession sessionB = null;
ClientSession session = null;
@@ -540,7 +545,7 @@
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
sf.setConsumerWindowSize(0);
if (largeMessages)
@@ -694,7 +699,7 @@
private void internalTestSlowConsumerOnMessageHandlerBufferOne(final boolean
largeMessage) throws Exception
{
- HornetQServer server = createServer(false);
+ HornetQServer server = createServer(false, isNetty());
ClientSession sessionB = null;
ClientSession session = null;
@@ -705,7 +710,7 @@
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
sf.setConsumerWindowSize(1);
if (largeMessage)
@@ -865,7 +870,7 @@
private void testNoWindowRoundRobin(final boolean largeMessages) throws Exception
{
- HornetQServer server = createServer(false);
+ HornetQServer server = createServer(false, isNetty());
ClientSession sessionA = null;
ClientSession sessionB = null;
@@ -876,7 +881,7 @@
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
sf.setConsumerWindowSize(-1);
if (largeMessages)
{
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-03
18:27:35 UTC (rev 8199)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-03
22:03:58 UTC (rev 8200)
@@ -38,7 +38,6 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.utils.DataConstants;
@@ -73,45 +72,35 @@
// Public --------------------------------------------------------
-// public void testFlowControlWithSyncReceiveNettyZeroConsumerWindowSize() throws
Exception
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+/// Those tests are duplicating ConsumerWindowSizeTest and NettyConsumerWindowSizeTest.
Do we need those here?
+//
+// public void testFlowControlWithSyncReceiveZeroConsumerWindowSize() throws Exception
// {
-// testFlowControlWithSyncReceive(true, 0);
+// testFlowControlWithSyncReceive(0);
// }
-//
-// public void testFlowControlWithSyncReceiveInVMZeroConsumerWindowSize() throws
Exception
+//
+// public void testFlowControlWithSyncReceiveSmallConsumerWindowSize() throws
Exception
// {
-// testFlowControlWithSyncReceive(false, 0);
+// testFlowControlWithSyncReceive(1000);
// }
-//
-// public void testFlowControlWithSyncReceiveNettySmallConsumerWindowSize() throws
Exception
+//
+// private void testFlowControlWithSyncReceive(final int consumerWindowSize) throws
Exception
// {
-// testFlowControlWithSyncReceive(true, 1000);
-// }
-//
-// public void testFlowControlWithSyncReceiveInVMSmallConsumerWindowSize() throws
Exception
-// {
-// testFlowControlWithSyncReceive(false, 1000);
-// }
-//
-// private void testFlowControlWithSyncReceive(final boolean netty, final int
consumerWindowSize) throws Exception
-// {
// ClientSession session = null;
//
// try
// {
-// if (netty)
-// {
-// server = createServer(true, createDefaultConfig(true));
-// }
-// else
-// {
-// server = createServer(true);
-// }
+// server = createServer(true, isNetty());
//
// server.start();
//
-// ClientSessionFactory sf = createInVMFactory();
-//
+// ClientSessionFactory sf = createFactory(isNetty());
+//
// sf.setConsumerWindowSize(consumerWindowSize);
// sf.setMinLargeMessageSize(1000);
//
@@ -130,31 +119,31 @@
// Message clientFile = createLargeClientMessage(session, messageSize, true);
//
// producer.send(clientFile);
-//
+//
// log.info("Sent message " + i);
// }
//
// ClientConsumer consumer = session.createConsumer(ADDRESS);
-//
+//
// session.start();
-//
+//
// for (int i = 0; i < numMessages; i++)
// {
// ClientMessage msg = consumer.receive(1000);
-//
+//
// int availBytes = msg.getBody().readableBytes();
-//
+//
// assertEquals(messageSize, availBytes);
-//
+//
// byte[] bytes = new byte[availBytes];
-//
+//
// msg.getBody().readBytes(bytes);
//
// msg.acknowledge();
-//
+//
// log.info("Received message " + i);
// }
-//
+//
// session.close();
//
// validateNoFilesOnLargeDir();
@@ -178,151 +167,127 @@
// }
// }
// }
-//
-// public void testFlowControlWithListenerNettyZeroConsumerWindowSize() throws
Exception
+//
+// public void testFlowControlWithListenerZeroConsumerWindowSize() throws Exception
// {
-// testFlowControlWithListener(true, 0);
+// testFlowControlWithListener(0);
// }
-//
-// public void testFlowControlWithListenerInVMZeroConsumerWindowSize() throws
Exception
+//
+// public void testFlowControlWithListenerSmallConsumerWindowSize() throws Exception
// {
-// testFlowControlWithListener(false, 0);
+// testFlowControlWithListener(1000);
// }
-//
-// public void testFlowControlWithListenerNettySmallConsumerWindowSize() throws
Exception
+//
+// private void testFlowControlWithListener(final int consumerWindowSize) throws
Exception
// {
-// testFlowControlWithListener(true, 1000);
+// ClientSession session = null;
+//
+// try
+// {
+// server = createServer(true, isNetty());
+//
+// server.start();
+//
+// ClientSessionFactory sf;
+//
+// sf = createFactory(isNetty());
+//
+// sf.setConsumerWindowSize(consumerWindowSize);
+// sf.setMinLargeMessageSize(1000);
+//
+// final int messageSize = 10000;
+//
+// session = sf.createSession(false, true, true);
+//
+// session.createTemporaryQueue(ADDRESS, ADDRESS);
+//
+// ClientProducer producer = session.createProducer(ADDRESS);
+//
+// final int numMessages = 1000;
+//
+// for (int i = 0; i < numMessages; i++)
+// {
+// Message clientFile = createLargeClientMessage(session, messageSize,
false);
+//
+// producer.send(clientFile);
+//
+// log.info("Sent message " + i);
+// }
+//
+// ClientConsumer consumer = session.createConsumer(ADDRESS);
+//
+// class MyHandler implements MessageHandler
+// {
+// int count = 0;
+//
+// final CountDownLatch latch = new CountDownLatch(1);
+//
+// volatile Exception exception;
+//
+// public void onMessage(ClientMessage message)
+// {
+// try
+// {
+// log.info("got message " + count);
+//
+// int availBytes = message.getBody().readableBytes();
+//
+// assertEquals(messageSize, availBytes);
+//
+// byte[] bytes = new byte[availBytes];
+//
+// message.getBody().readBytes(bytes);
+//
+// message.acknowledge();
+//
+// if (++count == numMessages)
+// {
+// latch.countDown();
+// }
+// }
+// catch (Exception e)
+// {
+// log.error("Failed to handle message", e);
+//
+// this.exception = e;
+// }
+// }
+// }
+//
+// MyHandler handler = new MyHandler();
+//
+// consumer.setMessageHandler(handler);
+//
+// session.start();
+//
+// handler.latch.await(10000, TimeUnit.MILLISECONDS);
+//
+// assertNull(handler.exception);
+//
+// session.close();
+//
+// validateNoFilesOnLargeDir();
+// }
+// finally
+// {
+// try
+// {
+// server.stop();
+// }
+// catch (Throwable ignored)
+// {
+// }
+//
+// try
+// {
+// session.close();
+// }
+// catch (Throwable ignored)
+// {
+// }
+// }
// }
-//
-// public void testFlowControlWithListenerInVMSmallConsumerWindowSize() throws
Exception
-// {
-// testFlowControlWithListener(false, 1000);
-// }
-
- private void testFlowControlWithListener(final boolean netty, final int
consumerWindowSize) throws Exception
- {
- ClientSession session = null;
- try
- {
- if (netty)
- {
- server = createServer(true, createDefaultConfig(true));
- }
- else
- {
- server = createServer(true);
- }
-
- server.start();
-
- ClientSessionFactory sf;
-
- if (netty)
- {
- sf = createNettyFactory();
- }
- else
- {
- sf = createInVMFactory();
- }
-
- sf.setConsumerWindowSize(consumerWindowSize);
- sf.setMinLargeMessageSize(1000);
-
- final int messageSize = 10000;
-
- session = sf.createSession(false, true, true);
-
- session.createTemporaryQueue(ADDRESS, ADDRESS);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- final int numMessages = 1000;
-
- for (int i = 0; i < numMessages; i++)
- {
- Message clientFile = createLargeClientMessage(session, messageSize, false);
-
- producer.send(clientFile);
-
- log.info("Sent message " + i);
- }
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- class MyHandler implements MessageHandler
- {
- int count = 0;
-
- final CountDownLatch latch = new CountDownLatch(1);
-
- volatile Exception exception;
-
- public void onMessage(ClientMessage message)
- {
- try
- {
- log.info("got message " + count);
-
- int availBytes = message.getBody().readableBytes();
-
- assertEquals(messageSize, availBytes);
-
- byte[] bytes = new byte[availBytes];
-
- message.getBody().readBytes(bytes);
-
- message.acknowledge();
-
- if (++count == numMessages)
- {
- latch.countDown();
- }
- }
- catch (Exception e)
- {
- log.error("Failed to handle message", e);
-
- this.exception = e;
- }
- }
- }
-
- MyHandler handler = new MyHandler();
-
- consumer.setMessageHandler(handler);
-
- session.start();
-
- handler.latch.await(10000, TimeUnit.MILLISECONDS);
-
- assertNull(handler.exception);
-
- session.close();
-
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
- session.close();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
public void testCloseConsumer() throws Exception
{
final int messageSize = (int)(3.5 *
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -331,11 +296,11 @@
try
{
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
session = sf.createSession(false, false, false);
@@ -400,11 +365,11 @@
try
{
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
session = sf.createSession(false, false, false);
@@ -453,11 +418,11 @@
session.close();
server.stop();
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
- sf = createInVMFactory();
+ sf = createFactory(isNetty());
session = sf.createSession(false, false, false);
@@ -527,11 +492,12 @@
try
{
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
+
session = sf.createSession(false, false, false);
@@ -607,11 +573,11 @@
try
{
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
SimpleString ADDRESS_DLA = ADDRESS.concat("-dla");
SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
@@ -742,11 +708,11 @@
try
{
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
SimpleString ADDRESS_DLA = ADDRESS.concat("-dla");
SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
@@ -818,11 +784,11 @@
session.close();
server.stop();
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
- sf = createInVMFactory();
+ sf = createFactory(isNetty());
session = sf.createSession(false, false, false);
@@ -877,7 +843,7 @@
try
{
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
@@ -889,7 +855,7 @@
server.getAddressSettingsRepository().addMatch("*", addressSettings);
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
session = sf.createSession(false, false, false);
@@ -927,11 +893,11 @@
session.close();
server.stop();
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
- sf = createInVMFactory();
+ sf = createFactory(isNetty());
session = sf.createSession(false, false, false);
@@ -992,11 +958,11 @@
try
{
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
session = sf.createSession(false, false, false);
@@ -1502,11 +1468,6 @@
testPageOnLargeMessage(true, false);
}
- public void testPageOnLargeMessageNullPersistence() throws Exception
- {
- testPageOnLargeMessage(false, false);
- }
-
public void testSendSmallMessageXA() throws Exception
{
testChunks(true, false, true, false, true, false, false, true, false, 100, 4,
RECEIVE_WAIT_TIME, 0);
@@ -1595,13 +1556,13 @@
try
{
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
SimpleString queue[] = new SimpleString[] { new
SimpleString("queue1"), new SimpleString("queue2") };
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
ClientSession session = sf.createSession(null, null, false, true, true, false,
0);
@@ -1678,13 +1639,13 @@
try
{
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
SimpleString queue[] = new SimpleString[] { new
SimpleString("queue1"), new SimpleString("queue2") };
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
ClientSession session = sf.createSession(null, null, false, true, true, false,
0);
@@ -1708,11 +1669,11 @@
server.stop();
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
- sf = createInVMFactory();
+ sf = createFactory(isNetty());
session = sf.createSession(null, null, false, true, true, false, 0);
}
@@ -1762,11 +1723,11 @@
try
{
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
session = sf.createSession(isXA, false, false);
@@ -1849,11 +1810,11 @@
try
{
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
ClientSession session = sf.createSession(isXA, false, false);
@@ -1983,11 +1944,11 @@
try
{
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
sf.setMinLargeMessageSize(1024);
sf.setConsumerWindowSize(1024 * 1024);
@@ -2085,11 +2046,11 @@
try
{
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
sf.setMinLargeMessageSize(1024);
sf.setConsumerWindowSize(1024 * 1024);
@@ -2187,11 +2148,11 @@
try
{
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
sf.setMinLargeMessageSize(100 * 1024);
@@ -2260,11 +2221,11 @@
try
{
- server = createServer(true);
+ server = createServer(true, isNetty());
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
sf.setMinLargeMessageSize(1024);
@@ -2357,7 +2318,7 @@
protected void testPageOnLargeMessage(final boolean realFiles, final boolean
sendBlocking) throws Exception
{
- Configuration config = createDefaultConfig();
+ Configuration config = createDefaultConfig(isNetty());
final int PAGE_MAX = 20 * 1024;
@@ -2366,7 +2327,6 @@
HashMap<String, AddressSettings> map = new HashMap<String,
AddressSettings>();
AddressSettings value = new AddressSettings();
- value.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
map.put(ADDRESS.toString(), value);
server = createServer(realFiles, config, PAGE_SIZE, PAGE_MAX, map);
server.start();
@@ -2377,7 +2337,7 @@
try
{
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
if (sendBlocking)
{
@@ -2431,7 +2391,7 @@
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
server.start();
- sf = createInVMFactory();
+ sf = createFactory(isNetty());
}
session = sf.createSession(null, null, false, true, true, false, 0);
Added:
trunk/tests/src/org/hornetq/tests/integration/client/NettyConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/NettyConsumerWindowSizeTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/client/NettyConsumerWindowSizeTest.java 2009-11-03
22:03:58 UTC (rev 8200)
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+/**
+ * A NettyConsumerWindowSizeTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class NettyConsumerWindowSizeTest extends ConsumerWindowSizeTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-11-03 18:27:35 UTC
(rev 8199)
+++ trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-11-03 22:03:58 UTC
(rev 8200)
@@ -143,9 +143,15 @@
protected HornetQServer createServer(final boolean realFiles)
{
- return createServer(realFiles, createDefaultConfig(), -1, -1, new
HashMap<String, AddressSettings>());
+ return createServer(realFiles, false);
}
+
+ protected HornetQServer createServer(final boolean realFiles, final boolean netty)
+ {
+ return createServer(realFiles, createDefaultConfig(netty), -1, -1, new
HashMap<String, AddressSettings>());
+ }
+
protected HornetQServer createServer(final boolean realFiles, final Configuration
configuration)
{
return createServer(realFiles, configuration, -1, -1, new HashMap<String,
AddressSettings>());
@@ -289,6 +295,18 @@
return configuration;
}
+ protected ClientSessionFactory createFactory(boolean isNetty)
+ {
+ if (isNetty)
+ {
+ return createNettyFactory();
+ }
+ else
+ {
+ return createInVMFactory();
+ }
+ }
+
protected ClientSessionFactory createInVMFactory()
{
return createFactory(INVM_CONNECTOR_FACTORY);