Author: timfox
Date: 2009-11-03 13:03:04 -0500 (Tue, 03 Nov 2009)
New Revision: 8196
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
Log:
some tweaks and some failing tests in LargeMessageTest
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-03
16:55:02 UTC (rev 8195)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-03
18:03:04 UTC (rev 8196)
@@ -236,9 +236,20 @@
SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
session.workDone();
-
+
+ boolean large;
+
if (msg.getBodyInputStream() != null || msg.getEncodeSize() >=
minLargeMessageSize || msg.isLargeMessage())
{
+ large = true;
+ }
+ else
+ {
+ large = false;
+ }
+
+ if (large)
+ {
sendMessageInChunks(sendBlocking, msg);
}
else if (sendBlocking)
@@ -257,7 +268,17 @@
//Note, that for a large message, the encode size only includes the properties +
headers
//Not the continuations, but this is ok since we are only interested in limiting
the amount of
//data in *memory* and continuations go straight to the disk
- theCredits.acquireCredits(msg.getEncodeSize());
+
+ if (large)
+ {
+ //TODO this is pretty hacky - we should define consistent meanings of encode
size
+
+ theCredits.acquireCredits(msg.getHeadersAndPropertiesEncodeSize());
+ }
+ else
+ {
+ theCredits.acquireCredits(msg.getEncodeSize());
+ }
}
catch (InterruptedException e)
{
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-03
16:55:02 UTC (rev 8195)
+++
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-03
18:03:04 UTC (rev 8196)
@@ -153,6 +153,12 @@
{
}
+
+ @Override
+ public synchronized int getEncodeSize()
+ {
+ return getHeadersAndPropertiesEncodeSize();
+ }
public LargeMessageEncodingContext createNewContext()
{
Modified: trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-11-03
16:55:02 UTC (rev 8195)
+++ trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-11-03
18:03:04 UTC (rev 8196)
@@ -70,7 +70,6 @@
public Response propose(final Proposal proposal) throws Exception
{
- log.info("proposing proposal " + proposal);
if (proposal.getClusterName() == null)
{
GroupBinding original = map.get(proposal.getGroupId());
Modified: trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
===================================================================
---
trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-11-03
16:55:02 UTC (rev 8195)
+++
trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-11-03
18:03:04 UTC (rev 8196)
@@ -101,8 +101,6 @@
Notification notification = new Notification(null, NotificationType.PROPOSAL,
props);
- log.info("sending proposal " + proposal);
-
managementService.sendNotification(notification);
sendCondition.await(timeout, TimeUnit.MILLISECONDS);
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-03 16:55:02
UTC (rev 8195)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-03 18:03:04
UTC (rev 8196)
@@ -88,7 +88,6 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
@@ -166,6 +165,8 @@
private volatile LargeServerMessage currentLargeMessage;
private ServerSessionPacketHandler handler;
+
+ private boolean closed;
// Constructors
---------------------------------------------------------------------------------
@@ -287,8 +288,6 @@
}
}
- private boolean closed;
-
public synchronized void close() throws Exception
{
if (tx != null && tx.getXid() == null)
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-03
16:55:02 UTC (rev 8195)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-03
18:03:04 UTC (rev 8196)
@@ -14,6 +14,8 @@
package org.hornetq.tests.integration.client;
import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -26,6 +28,7 @@
import org.hornetq.core.client.ClientProducer;
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.MessageHandler;
import org.hornetq.core.client.impl.ClientConsumerInternal;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
@@ -70,6 +73,256 @@
// Public --------------------------------------------------------
+// public void testFlowControlWithSyncReceiveNettyZeroConsumerWindowSize() throws
Exception
+// {
+// testFlowControlWithSyncReceive(true, 0);
+// }
+//
+// public void testFlowControlWithSyncReceiveInVMZeroConsumerWindowSize() throws
Exception
+// {
+// testFlowControlWithSyncReceive(false, 0);
+// }
+//
+// public void testFlowControlWithSyncReceiveNettySmallConsumerWindowSize() throws
Exception
+// {
+// testFlowControlWithSyncReceive(true, 1000);
+// }
+//
+// public void testFlowControlWithSyncReceiveInVMSmallConsumerWindowSize() throws
Exception
+// {
+// testFlowControlWithSyncReceive(false, 1000);
+// }
+//
+// private void testFlowControlWithSyncReceive(final boolean netty, final int
consumerWindowSize) throws Exception
+// {
+// ClientSession session = null;
+//
+// try
+// {
+// if (netty)
+// {
+// server = createServer(true, createDefaultConfig(true));
+// }
+// else
+// {
+// server = createServer(true);
+// }
+//
+// server.start();
+//
+// ClientSessionFactory sf = createInVMFactory();
+//
+// sf.setConsumerWindowSize(consumerWindowSize);
+// sf.setMinLargeMessageSize(1000);
+//
+// int messageSize = 10000;
+//
+// session = sf.createSession(false, true, true);
+//
+// session.createTemporaryQueue(ADDRESS, ADDRESS);
+//
+// ClientProducer producer = session.createProducer(ADDRESS);
+//
+// final int numMessages = 1000;
+//
+// for (int i = 0; i < numMessages; i++)
+// {
+// Message clientFile = createLargeClientMessage(session, messageSize, true);
+//
+// producer.send(clientFile);
+//
+// log.info("Sent message " + i);
+// }
+//
+// ClientConsumer consumer = session.createConsumer(ADDRESS);
+//
+// session.start();
+//
+// for (int i = 0; i < numMessages; i++)
+// {
+// ClientMessage msg = consumer.receive(1000);
+//
+// int availBytes = msg.getBody().readableBytes();
+//
+// assertEquals(messageSize, availBytes);
+//
+// byte[] bytes = new byte[availBytes];
+//
+// msg.getBody().readBytes(bytes);
+//
+// msg.acknowledge();
+//
+// log.info("Received message " + i);
+// }
+//
+// session.close();
+//
+// validateNoFilesOnLargeDir();
+// }
+// finally
+// {
+// try
+// {
+// server.stop();
+// }
+// catch (Throwable ignored)
+// {
+// }
+//
+// try
+// {
+// session.close();
+// }
+// catch (Throwable ignored)
+// {
+// }
+// }
+// }
+//
+// public void testFlowControlWithListenerNettyZeroConsumerWindowSize() throws
Exception
+// {
+// testFlowControlWithListener(true, 0);
+// }
+//
+// public void testFlowControlWithListenerInVMZeroConsumerWindowSize() throws
Exception
+// {
+// testFlowControlWithListener(false, 0);
+// }
+//
+// public void testFlowControlWithListenerNettySmallConsumerWindowSize() throws
Exception
+// {
+// testFlowControlWithListener(true, 1000);
+// }
+//
+// public void testFlowControlWithListenerInVMSmallConsumerWindowSize() throws
Exception
+// {
+// testFlowControlWithListener(false, 1000);
+// }
+
+ private void testFlowControlWithListener(final boolean netty, final int
consumerWindowSize) throws Exception
+ {
+ ClientSession session = null;
+
+ try
+ {
+ if (netty)
+ {
+ server = createServer(true, createDefaultConfig(true));
+ }
+ else
+ {
+ server = createServer(true);
+ }
+
+ server.start();
+
+ ClientSessionFactory sf;
+
+ if (netty)
+ {
+ sf = createNettyFactory();
+ }
+ else
+ {
+ sf = createInVMFactory();
+ }
+
+ sf.setConsumerWindowSize(consumerWindowSize);
+ sf.setMinLargeMessageSize(1000);
+
+ final int messageSize = 10000;
+
+ session = sf.createSession(false, true, true);
+
+ session.createTemporaryQueue(ADDRESS, ADDRESS);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, false);
+
+ producer.send(clientFile);
+
+ log.info("Sent message " + i);
+ }
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ class MyHandler implements MessageHandler
+ {
+ int count = 0;
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile Exception exception;
+
+ public void onMessage(ClientMessage message)
+ {
+ try
+ {
+ log.info("got message " + count);
+
+ int availBytes = message.getBody().readableBytes();
+
+ assertEquals(messageSize, availBytes);
+
+ byte[] bytes = new byte[availBytes];
+
+ message.getBody().readBytes(bytes);
+
+ message.acknowledge();
+
+ if (++count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle message", e);
+
+ this.exception = e;
+ }
+ }
+ }
+
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ session.start();
+
+ handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ assertNull(handler.exception);
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
public void testCloseConsumer() throws Exception
{
final int messageSize = (int)(3.5 *
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -437,9 +690,9 @@
consumerExpiry = session.createConsumer(ADDRESS_DLA);
msg1 = consumerExpiry.receive(5000);
-
+
assertNotNull(msg1);
-
+
msg1.acknowledge();
for (int i = 0; i < messageSize; i++)
@@ -545,7 +798,7 @@
consumerExpiry.close();
for (int i = 0; i < 10; i++)
- {
+ {
consumerExpiry = session.createConsumer(ADDRESS_DLA);
msg1 = consumerExpiry.receive(5000);
@@ -1054,37 +1307,121 @@
public void testFilePersistenceDelayed() throws Exception
{
- testChunks(false, false, true, false, true, false, false, false, false, 1,
LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 2000);
+ testChunks(false,
+ false,
+ true,
+ false,
+ true,
+ false,
+ false,
+ false,
+ false,
+ 1,
+ LARGE_MESSAGE_SIZE,
+ RECEIVE_WAIT_TIME,
+ 2000);
}
public void testFilePersistenceDelayedConsumer() throws Exception
{
- testChunks(false, false, true, false, true, false, false, false, true, 1,
LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 2000);
+ testChunks(false,
+ false,
+ true,
+ false,
+ true,
+ false,
+ false,
+ false,
+ true,
+ 1,
+ LARGE_MESSAGE_SIZE,
+ RECEIVE_WAIT_TIME,
+ 2000);
}
public void testFilePersistenceDelayedXA() throws Exception
{
- testChunks(true, false, true, false, true, false, false, false, false, 1,
LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 2000);
+ testChunks(true,
+ false,
+ true,
+ false,
+ true,
+ false,
+ false,
+ false,
+ false,
+ 1,
+ LARGE_MESSAGE_SIZE,
+ RECEIVE_WAIT_TIME,
+ 2000);
}
public void testFilePersistenceDelayedXAConsumer() throws Exception
{
- testChunks(true, false, true, false, true, false, false, false, true, 1,
LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 2000);
+ testChunks(true,
+ false,
+ true,
+ false,
+ true,
+ false,
+ false,
+ false,
+ true,
+ 1,
+ LARGE_MESSAGE_SIZE,
+ RECEIVE_WAIT_TIME,
+ 2000);
}
public void testNullPersistence() throws Exception
{
- testChunks(false, false, true, false, false, false, false, true, true, 1,
LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ testChunks(false,
+ false,
+ true,
+ false,
+ false,
+ false,
+ false,
+ true,
+ true,
+ 1,
+ LARGE_MESSAGE_SIZE,
+ RECEIVE_WAIT_TIME,
+ 0);
}
public void testNullPersistenceConsumer() throws Exception
{
- testChunks(false, false, true, false, false, false, false, true, true, 1,
LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ testChunks(false,
+ false,
+ true,
+ false,
+ false,
+ false,
+ false,
+ true,
+ true,
+ 1,
+ LARGE_MESSAGE_SIZE,
+ RECEIVE_WAIT_TIME,
+ 0);
}
public void testNullPersistenceXA() throws Exception
{
- testChunks(true, false, true, false, false, false, false, true, false, 1,
LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ testChunks(true,
+ false,
+ true,
+ false,
+ false,
+ false,
+ false,
+ true,
+ false,
+ 1,
+ LARGE_MESSAGE_SIZE,
+ RECEIVE_WAIT_TIME,
+ 0);
}
public void testNullPersistenceXAConsumer() throws Exception
@@ -1094,22 +1431,70 @@
public void testNullPersistenceDelayed() throws Exception
{
- testChunks(false, false, true, false, false, false, false, false, false, 100,
LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 100);
+ testChunks(false,
+ false,
+ true,
+ false,
+ false,
+ false,
+ false,
+ false,
+ false,
+ 100,
+ LARGE_MESSAGE_SIZE,
+ RECEIVE_WAIT_TIME,
+ 100);
}
public void testNullPersistenceDelayedConsumer() throws Exception
{
- testChunks(false, false, true, false, false, false, false, false, true, 100,
LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 100);
+ testChunks(false,
+ false,
+ true,
+ false,
+ false,
+ false,
+ false,
+ false,
+ true,
+ 100,
+ LARGE_MESSAGE_SIZE,
+ RECEIVE_WAIT_TIME,
+ 100);
}
public void testNullPersistenceDelayedXA() throws Exception
{
- testChunks(true, false, true, false, false, false, false, false, false, 100,
LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 100);
+ testChunks(true,
+ false,
+ true,
+ false,
+ false,
+ false,
+ false,
+ false,
+ false,
+ 100,
+ LARGE_MESSAGE_SIZE,
+ RECEIVE_WAIT_TIME,
+ 100);
}
public void testNullPersistenceDelayedXAConsumer() throws Exception
{
- testChunks(true, false, true, false, false, false, false, false, true, 100,
LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 100);
+ testChunks(true,
+ false,
+ true,
+ false,
+ false,
+ false,
+ false,
+ false,
+ true,
+ 100,
+ LARGE_MESSAGE_SIZE,
+ RECEIVE_WAIT_TIME,
+ 100);
}
public void testPageOnLargeMessage() throws Exception
@@ -1355,7 +1740,7 @@
{
internalTestSendRollback(true, true);
}
-
+
public void testSendRollbackXANonDurable() throws Exception
{
internalTestSendRollback(true, false);
@@ -1365,7 +1750,7 @@
{
internalTestSendRollback(false, true);
}
-
+
public void testSendRollbackNonDurable() throws Exception
{
internalTestSendRollback(false, false);
@@ -2032,7 +2417,7 @@
producer.send(message);
}
-
+
ClientMessage clientFile = createLargeClientMessage(session,
numberOfBytesBigMessage);
producer.send(clientFile);
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2009-11-03
16:55:02 UTC (rev 8195)
+++
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2009-11-03
18:03:04 UTC (rev 8196)
@@ -86,7 +86,7 @@
testFlowControl(false, 1000, 0, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
}
- public void testFlowControlLargeMessagesSmallWindowSize() throws Exception
+ public void testFlowControlLargerMessagesSmallWindowSize() throws Exception
{
testFlowControl(false, 1000, 10 * 1024, 10 * 1024, 1024, 1024, 1024, 1, 1, 0,
false);
}
@@ -141,7 +141,7 @@
testFlowControl(true, 1000, 0, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
}
- public void testFlowControlLargeMessagesSmallWindowSizeNetty() throws Exception
+ public void testFlowControlLargerMessagesSmallWindowSizeNetty() throws Exception
{
testFlowControl(true, 1000, 10 * 1024, 10 * 1024, 1024, 1024, 1024, 1, 1, 0,
false);
}
@@ -173,11 +173,50 @@
final long consumerDelay,
final boolean anon) throws Exception
{
+ testFlowControl(netty,
+ numMessages,
+ messageSize,
+ maxSize,
+ producerWindowSize,
+ consumerWindowSize,
+ ackBatchSize,
+ numConsumers,
+ numProducers,
+ consumerDelay,
+ anon,
+ -1,
+ false);
+ }
+
+ public void testFlowControlLargeMessages() throws Exception
+ {
+ testFlowControl(true, 1000, 10000, 100 * 1024, 1024, 1024, 0, 1, 1, 0, false, 1000,
true);
+ }
+
+ public void testFlowControlLargeMessages2() throws Exception
+ {
+ testFlowControl(true, 1000, 10000, -1, 1024, 0, 0, 1, 1, 0, false, 1000, true);
+ }
+
+ private void testFlowControl(final boolean netty,
+ final int numMessages,
+ final int messageSize,
+ final int maxSize,
+ final int producerWindowSize,
+ final int consumerWindowSize,
+ final int ackBatchSize,
+ final int numConsumers,
+ final int numProducers,
+ final long consumerDelay,
+ final boolean anon,
+ final int minLargeMessageSize,
+ final boolean realFiles) throws Exception
+ {
final SimpleString address = new SimpleString("testaddress");
Configuration config = super.createDefaultConfig(netty);
- HornetQServer server = createServer(false, config);
+ HornetQServer server = createServer(realFiles, config);
AddressSettings addressSettings = new AddressSettings();
addressSettings.setMaxSizeBytes(maxSize);
@@ -203,6 +242,11 @@
sf.setConsumerWindowSize(consumerWindowSize);
sf.setAckBatchSize(ackBatchSize);
+ if (minLargeMessageSize != -1)
+ {
+ sf.setMinLargeMessageSize(minLargeMessageSize);
+ }
+
ClientSession session = sf.createSession(false, true, true, true);
final String queueName = "testqueue";
@@ -212,8 +256,7 @@
session.createQueue(address, new SimpleString(queueName + i), null, false);
}
- session.start();
-
+
class MyHandler implements MessageHandler
{
int count = 0;
@@ -226,6 +269,16 @@
{
try
{
+ log.info("got message " + count);
+
+ int availBytes = message.getBody().readableBytes();
+
+ assertEquals(messageSize, availBytes);
+
+ byte[] bytes = new byte[availBytes];
+
+ message.getBody().readBytes(bytes);
+
message.acknowledge();
if (++count == numMessages * numProducers)
@@ -253,6 +306,8 @@
{
handlers[i] = new MyHandler();
+ log.info("created consumer");
+
ClientConsumer consumer = session.createConsumer(new SimpleString(queueName +
i));
consumer.setMessageHandler(handlers[i]);
@@ -289,10 +344,16 @@
else
{
producers[j].send(message);
+
+ //log.info("sent message " + i);
}
}
}
+
+ log.info("sent messages");
+
+ session.start();
for (int i = 0; i < numConsumers; i++)
{
@@ -713,7 +774,6 @@
assertFalse(store.isExceededAvailableCredits());
server.stop();
- }
-
-
+ }
+
}