[hornetq-commits] JBoss hornetq SVN: r9171 - in trunk: src/main/org/hornetq/core/message/impl and 4 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Apr 26 20:41:56 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-04-26 20:41:55 -0400 (Mon, 26 Apr 2010)
New Revision: 9171
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/largemessage/JMSLargeMessageTest.java
trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
Log:
HORNETQ-296 - LargeMessage and producer
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2010-04-26 16:25:35 UTC (rev 9170)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2010-04-27 00:41:55 UTC (rev 9171)
@@ -25,6 +25,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.utils.DataConstants;
/**
*
@@ -249,6 +250,7 @@
public void open()
{
+ getBodyBuffer().readerIndex(0);
}
public void close()
@@ -257,7 +259,14 @@
public long getLargeBodySize()
{
- return buffer.writerIndex();
+ if (isLargeMessage())
+ {
+ return getBodyBuffer().writerIndex();
+ }
+ else
+ {
+ return getBodyBuffer().writerIndex() - BODY_OFFSET;
+ }
}
public int encode(final ByteBuffer bufferRead) throws HornetQException
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-04-26 16:25:35 UTC (rev 9170)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-04-27 00:41:55 UTC (rev 9171)
@@ -217,7 +217,7 @@
boolean isLarge;
- if (msgI.getBodyInputStream() != null || msgI.isLargeMessage())
+ if (msgI.getBodyInputStream() != null || msgI.isLargeMessage() || msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)
{
isLarge = true;
}
Modified: trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2010-04-26 16:25:35 UTC (rev 9170)
+++ trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2010-04-27 00:41:55 UTC (rev 9171)
@@ -27,6 +27,7 @@
import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
@@ -60,6 +61,8 @@
private volatile SessionReceiveContinuationMessage currentPacket = null;
private final long totalSize;
+
+ private final int bufferSize;
private boolean streamEnded = false;
@@ -97,6 +100,15 @@
final int readTimeout,
final File cachedFile)
{
+ this(consumerInternal, totalSize, readTimeout, cachedFile, 10 * 1024);
+ }
+
+ public LargeMessageBufferImpl(final ClientConsumerInternal consumerInternal,
+ final long totalSize,
+ final int readTimeout,
+ final File cachedFile,
+ final int bufferSize)
+ {
this.consumerInternal = consumerInternal;
this.readTimeout = readTimeout;
this.totalSize = totalSize;
@@ -108,6 +120,7 @@
{
fileCache = new FileCache(cachedFile);
}
+ this.bufferSize = bufferSize;
}
// Public --------------------------------------------------------
@@ -193,6 +206,7 @@
}
}
+
packets.offer(packet);
}
}
@@ -1020,7 +1034,30 @@
{
return (char)readShort();
}
+
+ public char getChar(final int index)
+ {
+ return (char)getShort(index);
+ }
+ public double getDouble(final int index)
+ {
+ return Double.longBitsToDouble(getLong(index));
+ }
+
+ public float getFloat(final int index)
+ {
+ return Float.intBitsToFloat(getInt(index));
+ }
+
+ public HornetQBuffer readBytes(final int length)
+ {
+ byte bytesToGet[] = new byte[length];
+ getBytes(readerIndex, bytesToGet);
+ readerIndex += length;
+ return HornetQBuffers.wrappedBuffer(bytesToGet);
+ }
+
/* (non-Javadoc)
* @see org.hornetq.spi.core.remoting.HornetQBuffer#readDouble()
*/
@@ -1313,8 +1350,6 @@
private class FileCache
{
- private final int BUFFER_SIZE = 10 * 1024;
-
public FileCache(final File cachedFile)
{
this.cachedFile = cachedFile;
@@ -1347,11 +1382,13 @@
throw new ArrayIndexOutOfBoundsException("position > " + cachedChannel.size());
}
- readCachePositionStart = position / BUFFER_SIZE * BUFFER_SIZE;
+ readCachePositionStart = position / bufferSize * bufferSize;
+
+ cachedChannel.position(readCachePositionStart);
if (readCache == null)
{
- readCache = ByteBuffer.allocate(BUFFER_SIZE);
+ readCache = ByteBuffer.allocate(bufferSize);
}
readCache.clear();
@@ -1452,80 +1489,47 @@
public ChannelBuffer channelBuffer()
{
- // TODO Auto-generated method stub
return null;
}
public HornetQBuffer copy(final int index, final int length)
{
- // TODO Auto-generated method stub
- return null;
+ throw new IllegalAccessError(LargeMessageBufferImpl.READ_ONLY_ERROR_MESSAGE);
}
public HornetQBuffer duplicate()
{
- // TODO Auto-generated method stub
- return null;
+ throw new IllegalAccessError(LargeMessageBufferImpl.READ_ONLY_ERROR_MESSAGE);
}
- public char getChar(final int index)
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public double getDouble(final int index)
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public float getFloat(final int index)
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public HornetQBuffer readBytes(final int length)
- {
- // TODO Auto-generated method stub
- return null;
- }
-
public HornetQBuffer readSlice(final int length)
{
- // TODO Auto-generated method stub
- return null;
+ throw new IllegalAccessError(LargeMessageBufferImpl.READ_ONLY_ERROR_MESSAGE);
}
public void setChar(final int index, final char value)
{
- // TODO Auto-generated method stub
-
+ throw new IllegalAccessError(LargeMessageBufferImpl.READ_ONLY_ERROR_MESSAGE);
}
public void setDouble(final int index, final double value)
{
- // TODO Auto-generated method stub
-
+ throw new IllegalAccessError(LargeMessageBufferImpl.READ_ONLY_ERROR_MESSAGE);
}
public void setFloat(final int index, final float value)
{
- // TODO Auto-generated method stub
-
+ throw new IllegalAccessError(LargeMessageBufferImpl.READ_ONLY_ERROR_MESSAGE);
}
public HornetQBuffer slice()
{
- // TODO Auto-generated method stub
- return null;
+ throw new IllegalAccessError(LargeMessageBufferImpl.READ_ONLY_ERROR_MESSAGE);
}
public void writeBytes(final HornetQBuffer src, final int srcIndex, final int length)
{
- // TODO Auto-generated method stub
-
+ throw new IllegalAccessError(LargeMessageBufferImpl.READ_ONLY_ERROR_MESSAGE);
}
}
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-04-26 16:25:35 UTC (rev 9170)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-04-27 00:41:55 UTC (rev 9171)
@@ -55,7 +55,10 @@
public static final SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_HQ_ROUTE_TO");
public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
+
+ public static final int BODY_OFFSET = BUFFER_HEADER_SPACE + DataConstants.SIZE_INT;
+
protected long messageID;
protected SimpleString address;
@@ -248,7 +251,7 @@
{
if (buffer instanceof LargeMessageBufferInternal == false)
{
- bodyBuffer = new ResetLimitWrappedHornetQBuffer(BUFFER_HEADER_SPACE + DataConstants.SIZE_INT, buffer, this);
+ bodyBuffer = new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, buffer, this);
}
else
{
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-04-26 16:25:35 UTC (rev 9170)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-04-27 00:41:55 UTC (rev 9171)
@@ -137,6 +137,109 @@
}
}
+
+ public void testLargeBufferTransacted() throws Exception
+ {
+ doTestLargeBuffer(true);
+ }
+
+ public void testLargeBufferNotTransacted() throws Exception
+ {
+ doTestLargeBuffer(false);
+ }
+
+ public void doTestLargeBuffer(boolean transacted) throws Exception
+ {
+ final int journalsize = 100 * 1024;
+ final int messageSize = 3 * journalsize;
+ // final int messageSize = 5 * 1024;
+
+ ClientSession session = null;
+
+ try
+ {
+ Configuration config = createDefaultConfig(isNetty());
+ config.setJournalFileSize(journalsize);
+
+ config.setJournalBufferSize_AIO(10 * 1024);
+ config.setJournalBufferSize_NIO(10 * 1024);
+
+ server = createServer(true, config);
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(isNetty());
+
+ session = sf.createSession(!transacted, !transacted, 0);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ Message clientFile = session.createMessage(true);
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ clientFile.getBodyBuffer().writeByte(getSamplebyte(i));
+ }
+
+
+ producer.send(clientFile);
+
+ if (transacted)
+ {
+ session.commit();
+ }
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ ClientMessage msg1 = consumer.receive(5000);
+ assertNotNull(msg1);
+
+ Assert.assertNotNull(msg1);
+
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ //System.out.print(msg1.getBodyBuffer().readByte() + " ");
+ //if (i % 100 == 0) System.out.println();
+ assertEquals("position = " + i, getSamplebyte(i), msg1.getBodyBuffer().readByte());
+ }
+
+ msg1.acknowledge();
+
+ consumer.close();
+
+
+ if (transacted)
+ {
+ session.commit();
+ }
+
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
public void testDLALargeMessage() throws Exception
{
final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -799,11 +902,95 @@
Assert.assertEquals(messageSize, msg2.getBodySize());
- for (int i = 0; i < messageSize; i++)
+ compareString(messageSize, msg2);
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
{
- Assert.assertEquals(UnitTestCase.getSamplebyte(i), msg2.getBodyBuffer().readByte());
+ server.stop();
}
+ catch (Throwable ignored)
+ {
+ }
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ public void testResendCachedSmallStreamMessage() throws Exception
+ {
+ internalTestResendMessage(50000);
+ }
+
+ public void testResendCachedLargeStreamMessage() throws Exception
+ {
+ internalTestCachedResendMessage(150 * 1024);
+ }
+
+ public void internalTestCachedResendMessage(final long messageSize) throws Exception
+ {
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(isNetty());
+
+ sf.setMinLargeMessageSize(111);
+
+ sf.setCacheLargeMessagesClient(true);
+
+ session = sf.createSession(false, false, false);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ Message originalMsg = createLargeClientMessage(session, messageSize, false);
+
+ producer.send(originalMsg);
+
+ session.commit();
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+
+ session.start();
+
+ ClientMessage msgReceived = consumer.receive(10000);
+ msgReceived.acknowledge();
+
+ session.commit();
+
+ compareString(messageSize, msgReceived);
+
+ msgReceived.getBodyBuffer().readerIndex(0);
+
+ producer.send(msgReceived);
+
+ session.commit();
+
+ ClientMessage msgReceived2 = consumer.receive(10000);
+
+ msgReceived2.acknowledge();
+
+ compareString(messageSize, msgReceived2);
+
+ session.commit();
+
session.close();
validateNoFilesOnLargeDir();
@@ -828,6 +1015,19 @@
}
}
+ /**
+ * @param messageSize
+ * @param msg2
+ */
+ private void compareString(final long messageSize, ClientMessage msg)
+ {
+ assertNotNull(msg);
+ for (long i = 0; i < messageSize; i++)
+ {
+ Assert.assertEquals("position " + i, UnitTestCase.getSamplebyte(i), msg.getBodyBuffer().readByte());
+ }
+ }
+
public void testFilePersistenceOneHugeMessage() throws Exception
{
testChunks(false,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java 2010-04-26 16:25:35 UTC (rev 9170)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java 2010-04-27 00:41:55 UTC (rev 9171)
@@ -239,7 +239,7 @@
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
callTimeout,
- HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
+ true,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/largemessage/JMSLargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/largemessage/JMSLargeMessageTest.java 2010-04-26 16:25:35 UTC (rev 9170)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/largemessage/JMSLargeMessageTest.java 2010-04-27 00:41:55 UTC (rev 9171)
@@ -33,6 +33,8 @@
import org.hornetq.tests.util.JMSTestBase;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.UUID;
+import org.hornetq.utils.UUIDGenerator;
/**
*
@@ -338,6 +340,68 @@
}
+
+ public void testHugeString() throws Exception
+ {
+ int msgSize = 1024 * 1024;
+
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = session.createProducer(queue1);
+
+ TextMessage m = session.createTextMessage();
+
+ StringBuffer buffer = new StringBuffer();
+ while(buffer.length() < msgSize)
+ {
+ buffer.append(UUIDGenerator.getInstance().generateStringUUID());
+ }
+
+ final String originalString = buffer.toString();
+
+ m.setText(originalString);
+
+ buffer = null;
+
+ prod.send(m);
+
+ conn.close();
+
+ validateNoFilesOnLargeDir(1);
+
+ conn = cf.createConnection();
+
+ session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = session.createConsumer(queue1);
+
+ conn.start();
+
+ TextMessage rm = (TextMessage)cons.receive(10000);
+ Assert.assertNotNull(rm);
+
+ String str = rm.getText();
+ assertEquals(originalString, str);
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ validateNoFilesOnLargeDir(0);
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2010-04-26 16:25:35 UTC (rev 9170)
+++ trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2010-04-27 00:41:55 UTC (rev 9171)
@@ -56,12 +56,29 @@
// Attributes ----------------------------------------------------
+ static int tmpFileCounter = 0;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ tmpFileCounter++;
+
+ File tmp = new File(getTestDir());
+ tmp.mkdirs();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
// Test Simple getBytes
public void testGetBytes() throws Exception
{
@@ -193,7 +210,7 @@
private File getTestFile()
{
- return new File(getTestDir(), "temp.file");
+ return new File(getTestDir(), "temp." + tmpFileCounter + ".file");
}
public void testReadDataOverCached() throws Exception
@@ -298,7 +315,49 @@
Assert.assertEquals(i, bytes[i]);
}
}
+
+ public void testSplitBufferOnFile() throws Exception
+ {
+ LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(),
+ 1024 * 1024,
+ 1,
+ getTestFile(),
+ 1024);
+ try
+ {
+ long count = 0;
+ for (int i = 0; i < 10; i++)
+ {
+ byte buffer[] = new byte[10240];
+ for (int j = 0; j < 10240; j++)
+ {
+ buffer[j] = getSamplebyte(count++);
+ }
+ outBuffer.addPacket(new FakePacket(1, buffer, true, false));
+ }
+
+ outBuffer.readerIndex(0);
+
+ for (int i = 0; i < 10240 * 10; i++)
+ {
+ assertEquals("position " + i, getSamplebyte(i), outBuffer.readByte());
+ }
+
+ outBuffer.readerIndex(0);
+
+ for (int i = 0; i < 10240 * 10; i++)
+ {
+ assertEquals("position " + i, getSamplebyte(i), outBuffer.readByte());
+ }
+ }
+ finally
+ {
+ outBuffer.close();
+ }
+
+ }
+
public void testStreamData() throws Exception
{
final LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(),
@@ -727,7 +786,7 @@
public void stop(boolean waitForOnMessage) throws HornetQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // To change body of implemented methods use File | Settings | File Templates.
}
public SessionQueueQueryResponseMessage getQueueInfo()
More information about the hornetq-commits
mailing list