[hornetq-commits] JBoss hornetq SVN: r10063 - in trunk: src/main/org/hornetq/jms/client and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Dec 20 20:50:48 EST 2010
Author: clebert.suconic at jboss.com
Date: 2010-12-20 20:50:48 -0500 (Mon, 20 Dec 2010)
New Revision: 10063
Modified:
trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
Log:
fixing tests on Large Message
Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-12-20 15:10:20 UTC (rev 10062)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-12-21 01:50:48 UTC (rev 10063)
@@ -1730,6 +1730,13 @@
try
{
msg = sourceConsumer.receive(1000);
+
+ if (msg instanceof HornetQMessage)
+ {
+ // We need to check the buffer mainly in the case of LargeMessages
+ // As we need to reconstruct the buffer before resending the message
+ ((HornetQMessage)msg).checkBuffer();
+ }
}
catch (JMSException jmse)
{
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2010-12-20 15:10:20 UTC (rev 10062)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2010-12-21 01:50:48 UTC (rev 10063)
@@ -879,6 +879,11 @@
{
message.getBodyBuffer().resetReaderIndex();
}
+
+ public void checkBuffer()
+ {
+ message.getBodyBuffer();
+ }
public void doBeforeReceive() throws Exception
{
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2010-12-20 15:10:20 UTC (rev 10062)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2010-12-21 01:50:48 UTC (rev 10063)
@@ -27,6 +27,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.tests.util.RandomUtil;
/**
* A LargeMessageCompressTest
@@ -277,6 +278,111 @@
}
+
+ public void testLargeMessageCompressionRestartAndCheckSize() throws Exception
+ {
+ final int messageSize = 1024 * 1024;
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ byte [] msgs = new byte[1024 * 1024];
+ for (int i = 0 ; i < msgs.length; i++)
+ {
+ msgs[i] = RandomUtil.randomByte();
+ }
+
+ Message clientFile = createLargeClientMessage(session, msgs, true);
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.close();
+
+ sf.close();
+
+ locator.close();
+
+ server.stop();
+
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ locator = createFactory(isNetty());
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ ClientMessage msg1 = consumer.receive(1000);
+ Assert.assertNotNull(msg1);
+
+ assertEquals(messageSize, msg1.getBodySize());
+
+ String testDir = this.getTestDir();
+ File testFile = new File(testDir, "async_large_message");
+ FileOutputStream output = new FileOutputStream(testFile);
+
+ msg1.saveToOutputStream(output);
+
+ msg1.acknowledge();
+
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+
+ //verify
+ FileInputStream input = new FileInputStream(testFile);
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ byte b = (byte)input.read();
+ assertEquals("position = " + i, msgs[i], b);
+ }
+
+ testFile.delete();
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+
public void testSendServerMessage() throws Exception
{
// doesn't make sense as compressed
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java 2010-12-20 15:10:20 UTC (rev 10062)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java 2010-12-21 01:50:48 UTC (rev 10063)
@@ -100,7 +100,7 @@
public long countMessages(final String filter) throws Exception
{
- return (Long)proxy.invokeOperation("countMessages", filter);
+ return ((Number)proxy.invokeOperation("countMessages", filter)).intValue();
}
public boolean expireMessage(final String messageID) throws Exception
Modified: trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2010-12-20 15:10:20 UTC (rev 10062)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2010-12-21 01:50:48 UTC (rev 10063)
@@ -617,6 +617,13 @@
return createLargeClientMessage(session, numberOfBytes, true);
}
+ protected ClientMessage createLargeClientMessage (final ClientSession session, final byte[] buffer, final boolean durable) throws Exception
+ {
+ ClientMessage msgs = session.createMessage(durable);
+ msgs.getBodyBuffer().writeBytes(buffer);
+ return msgs;
+ }
+
protected ClientMessage createLargeClientMessage(final ClientSession session,
final long numberOfBytes,
final boolean persistent) throws Exception
More information about the hornetq-commits
mailing list