Author: clebert.suconic(a)jboss.com
Date: 2009-11-25 17:20:57 -0500 (Wed, 25 Nov 2009)
New Revision: 8404
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/message/BodyEncoder.java
branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendLargeMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/LargeServerMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Fix for LargeMessages
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-11-25
20:11:08 UTC (rev 8403)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-11-25
22:20:57 UTC (rev 8404)
@@ -16,15 +16,15 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.buffers.impl.ResetLimitWrappedHornetQBuffer;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.LargeMessageBuffer;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.utils.DataConstants;
import org.hornetq.utils.SimpleString;
/**
@@ -147,17 +147,6 @@
*/
// FIXME - only used for large messages - move it!
- public long getLargeBodySize()
- {
- if (largeMessage)
- {
- return ((LargeMessageBuffer)getWholeBuffer()).getSize();
- }
- else
- {
- return this.getBodySize();
- }
- }
/* (non-Javadoc)
* @see
org.hornetq.core.client.ClientMessage#saveToOutputStream(java.io.OutputStream)
@@ -172,7 +161,9 @@
{
try
{
- out.write(this.getWholeBuffer().toByteBuffer().array());
+ byte readBuffer[] = new byte[getBodySize()];
+ getBodyBuffer().readBytes(readBuffer);
+ out.write(readBuffer);
}
catch (IOException e)
{
@@ -249,6 +240,50 @@
bodyBuffer.setBuffer(buffer);
}
}
+
+ public BodyEncoder getBodyEncoder() throws HornetQException
+ {
+ return new DecodingContext();
+ }
+
+ private final class DecodingContext implements BodyEncoder
+ {
+ private int lastPos = 0;
+
+ public DecodingContext()
+ {
+ }
+
+ public void open()
+ {
+ }
+
+ public void close()
+ {
+ }
+
+ public long getLargeBodySize()
+ {
+ return buffer.writerIndex();
+ }
+
+ public int encode(final ByteBuffer bufferRead) throws HornetQException
+ {
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bufferRead);
+ return encode(buffer, bufferRead.capacity());
+ }
+
+ public int encode(final HornetQBuffer bufferOut, final int size)
+ {
+ byte[] bytes = new byte[size];
+ getWholeBuffer().readBytes(bytes);
+ bufferOut.writeBytes(bytes, 0, size);
+ return size;
+ }
+ }
+
+
+
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-25
20:11:08 UTC (rev 8403)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-25
22:20:57 UTC (rev 8404)
@@ -232,8 +232,6 @@
boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend :
blockOnNonPersistentSend;
- SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
-
session.workDone();
boolean isLarge;
@@ -251,13 +249,18 @@
{
largeMessageSend(sendBlocking, msg, theCredits);
}
- else if (sendBlocking)
- {
- channel.sendBlocking(message);
- }
else
{
- channel.send(message);
+ SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
+
+ if (sendBlocking)
+ {
+ channel.sendBlocking(message);
+ }
+ else
+ {
+ channel.send(message);
+ }
}
try
@@ -345,10 +348,12 @@
final Message msg,
final ClientProducerCredits credits) throws
HornetQException
{
- final long bodySize = msg.getLargeBodySize();
-
BodyEncoder context = msg.getBodyEncoder();
+
+ final long bodySize = context.getLargeBodySize();
+
+
context.open();
try
{
Modified: branches/20-optimisation/src/main/org/hornetq/core/message/BodyEncoder.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/BodyEncoder.java 2009-11-25
20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/BodyEncoder.java 2009-11-25
22:20:57 UTC (rev 8404)
@@ -35,4 +35,6 @@
int encode(ByteBuffer bufferRead) throws HornetQException;
int encode(HornetQBuffer bufferOut, int size) throws HornetQException;
+
+ long getLargeBodySize();
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/Message.java 2009-11-25
20:11:08 UTC (rev 8403)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/Message.java 2009-11-25
22:20:57 UTC (rev 8404)
@@ -18,6 +18,7 @@
import java.util.Set;
import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
@@ -198,10 +199,8 @@
void decodeHeadersAndProperties(HornetQBuffer buffer);
- long getLargeBodySize();
+ BodyEncoder getBodyEncoder() throws HornetQException;
- BodyEncoder getBodyEncoder();
-
/** Get the InputStream used on a message that will be sent over a producer */
InputStream getBodyInputStream();
Modified:
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-25
20:11:08 UTC (rev 8403)
+++
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-25
22:20:57 UTC (rev 8404)
@@ -142,8 +142,12 @@
protected MessageImpl(final long messageID, final int initialMessageBufferSize)
{
+ this(initialMessageBufferSize);
+ }
+
+ protected MessageImpl(final int initialMessageBufferSize)
+ {
this();
- this.messageID = messageID;
createBody(initialMessageBufferSize);
}
@@ -165,11 +169,15 @@
endOfBodyPosition = other.endOfBodyPosition;
endOfMessagePosition = other.endOfMessagePosition;
copied = other.copied;
-
- // We need to copy the underlying buffer too, since the different messsages
thereafter might have different
- // properties set on them, making their encoding different
- buffer = other.buffer.copy(0, other.buffer.capacity());
- buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());
+
+ if (other.buffer != null)
+ {
+ createBody(other.buffer.capacity());
+ // We need to copy the underlying buffer too, since the different messsages
thereafter might have different
+ // properties set on them, making their encoding different
+ buffer = other.buffer.copy(0, other.buffer.capacity());
+ buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());
+ }
}
// Message implementation ----------------------------------------
@@ -779,7 +787,7 @@
return buffer;
}
- public BodyEncoder getBodyEncoder()
+ public BodyEncoder getBodyEncoder() throws HornetQException
{
return new DecodingContext();
}
@@ -889,6 +897,11 @@
public void close()
{
}
+
+ public long getLargeBodySize()
+ {
+ return buffer.writerIndex();
+ }
public int encode(final ByteBuffer bufferRead) throws HornetQException
{
Modified:
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2009-11-25
20:11:08 UTC (rev 8403)
+++
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2009-11-25
22:20:57 UTC (rev 8404)
@@ -125,20 +125,6 @@
}
@Override
- public synchronized long getLargeBodySize()
- {
- try
- {
- validateFile();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage(), e);
- }
- return bodySize;
- }
-
- @Override
public synchronized int getEncodeSize()
{
return getHeadersAndPropertiesEncodeSize();
@@ -146,15 +132,15 @@
@Override
public void encode(final HornetQBuffer buffer)
- {
+ {
super.encodeHeadersAndProperties(buffer);
}
-
+
@Override
public void decode(final HornetQBuffer buffer)
- {
+ {
file = null;
-
+
super.decodeHeadersAndProperties(buffer);
}
@@ -174,8 +160,9 @@
}
@Override
- public BodyEncoder getBodyEncoder()
+ public BodyEncoder getBodyEncoder() throws HornetQException
{
+ validateFile();
return new DecodingContext();
}
@@ -312,22 +299,30 @@
// Private -------------------------------------------------------
- private synchronized void validateFile() throws Exception
+ private synchronized void validateFile() throws HornetQException
{
- if (file == null)
+ try
{
- if (messageID <= 0)
+ if (file == null)
{
- throw new RuntimeException("MessageID not set on LargeMessage");
+ if (messageID <= 0)
+ {
+ throw new RuntimeException("MessageID not set on
LargeMessage");
+ }
+
+ file = storageManager.createFileForLargeMessage(getMessageID(), durable);
+
+ file.open();
+
+ bodySize = file.size();
+
}
-
- file = storageManager.createFileForLargeMessage(getMessageID(), durable);
-
- file.open();
-
- bodySize = file.size();
-
}
+ catch (Exception e)
+ {
+ // TODO: There is an IO_ERROR on trunk now, this should be used here instead
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, e.getMessage(), e);
+ }
}
/* (non-Javadoc)
@@ -415,5 +410,13 @@
return bytesRead;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.BodyEncoder#getLargeBodySize()
+ */
+ public long getLargeBodySize()
+ {
+ return bodySize;
+ }
}
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-25
20:11:08 UTC (rev 8403)
+++
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-25
22:20:57 UTC (rev 8404)
@@ -57,18 +57,13 @@
*/
public synchronized void addBytes(final byte[] bytes)
{
-// HornetQBuffer buffer = getBuffer();
-//
-// if (buffer != null)
-// {
- // expand the buffer
- buffer.writeBytes(bytes);
-// }
-// else
-// {
-// // Reuse the initial byte array on the buffer construction
-// setBuffer(ChannelBuffers.dynamicBuffer(bytes));
-// }
+ if (buffer == null)
+ {
+ buffer = HornetQBuffers.dynamicBuffer(bytes.length);
+ }
+
+ // expand the buffer
+ buffer.writeBytes(bytes);
}
/* (non-Javadoc)
Modified:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendLargeMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendLargeMessage.java 2009-11-25
20:11:08 UTC (rev 8403)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendLargeMessage.java 2009-11-25
22:20:57 UTC (rev 8404)
@@ -32,9 +32,6 @@
/** Used only if largeMessage */
private byte[] largeMessageHeader;
- /** We need to set the MessageID when replicating this on the server */
- private long largeMessageId = -1;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -58,28 +55,11 @@
return largeMessageHeader;
}
- /**
- * @return the largeMessageId
- */
- public long getLargeMessageID()
- {
- return largeMessageId;
- }
-
- /**
- * @param largeMessageId the largeMessageId to set
- */
- public void setLargeMessageID(long id)
- {
- this.largeMessageId = id;
- }
-
@Override
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeInt(largeMessageHeader.length);
buffer.writeBytes(largeMessageHeader);
- buffer.writeLong(largeMessageId);
}
@Override
@@ -90,8 +70,6 @@
largeMessageHeader = new byte[largeMessageLength];
buffer.readBytes(largeMessageHeader);
-
- largeMessageId = buffer.readLong();
}
// Package protected ---------------------------------------------
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/LargeServerMessage.java 2009-11-25
20:11:08 UTC (rev 8403)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/LargeServerMessage.java 2009-11-25
22:20:57 UTC (rev 8404)
@@ -13,7 +13,6 @@
package org.hornetq.core.server;
-import org.hornetq.core.message.BodyEncoder;
/**
* A LargeMessage
@@ -36,8 +35,6 @@
/** Close the files if opened */
void releaseResources();
- long getLargeBodySize();
-
void deleteFile() throws Exception;
void incrementDelayDeletionCount();
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-25
20:11:08 UTC (rev 8403)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-25
22:20:57 UTC (rev 8404)
@@ -638,7 +638,7 @@
* This Inner class was created to avoid a bunch of loose properties about the
current LargeMessage being sent*/
private final class LargeMessageDeliverer
{
- private final long sizePendingLargeMessage;
+ private long sizePendingLargeMessage;
private LargeServerMessage largeMessage;
@@ -657,8 +657,6 @@
largeMessage.incrementDelayDeletionCount();
- sizePendingLargeMessage = largeMessage.getLargeBodySize();
-
this.ref = ref;
}
@@ -684,14 +682,16 @@
largeMessage.encodeHeadersAndProperties(headerBuffer);
+ context = largeMessage.getBodyEncoder();
+
+ sizePendingLargeMessage = context.getLargeBodySize();
+
SessionReceiveLargeMessage initialPacket = new
SessionReceiveLargeMessage(id,
headerBuffer.toByteBuffer()
.array(),
-
largeMessage.getLargeBodySize(),
+
context.getLargeBodySize(),
ref.getDeliveryCount());
- context = largeMessage.getBodyEncoder();
-
context.open();
sentInitialPacket = true;
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-25
20:11:08 UTC (rev 8403)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-25
22:20:57 UTC (rev 8404)
@@ -64,6 +64,11 @@
{
super(messageID, initialMessageBufferSize);
}
+
+ protected ServerMessageImpl(final int initialMessageBufferSize)
+ {
+ super(initialMessageBufferSize);
+ }
/*
* Copy constructor
@@ -144,11 +149,6 @@
return false;
}
- public long getLargeBodySize()
- {
- return -1;
- }
-
public int getMemoryEstimate()
{
if (memoryEstimate == -1)
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-25
20:11:08 UTC (rev 8403)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-25
22:20:57 UTC (rev 8404)
@@ -18,11 +18,6 @@
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
-import junit.framework.AssertionFailedError;
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
-import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
@@ -53,13 +48,6 @@
*/
public class LargeMessageTest extends LargeMessageTestBase
{
- public static Test suite()
- {
- TestSuite suite = new TestSuite();
-
- return suite;
- }
-
// Constants -----------------------------------------------------
final static int RECEIVE_WAIT_TIME = 60000;
@@ -788,14 +776,18 @@
producer2.send(msg1);
+ boolean failed = false;
+
try
{
producer2.send(msg1);
- fail("Expected Exception");
}
catch (Throwable e)
{
+ failed = true;
}
+
+ assertTrue("Exception expected", failed);
session.commit();
@@ -926,7 +918,7 @@
false,
true,
true,
- 100,
+ 2,
LARGE_MESSAGE_SIZE,
RECEIVE_WAIT_TIME,
0);
@@ -2211,22 +2203,18 @@
ClientMessage message = null;
- HornetQBuffer body = null;
-
for (int i = 0; i < 100; i++)
{
message = session.createClientMessage(true);
+ // TODO: Why do I need to reset the writerIndex?
+ message.getBodyBuffer().writerIndex(0);
+
for (int j = 1; j <= numberOfBytes; j++)
{
message.getBodyBuffer().writeInt(j);
}
- if (i == 0)
- {
- body = message.getBodyBuffer();
- }
-
producer.send(message);
}
@@ -2262,17 +2250,12 @@
assertNotNull(message2);
- try
+ message.getBodyBuffer().readerIndex(0);
+
+ for (int j = 1; j <= numberOfBytes; j++)
{
- assertEqualsByteArrays(body.writerIndex(), body.toByteBuffer().array(),
message2.getBodyBuffer().toByteBuffer().
- array());
+ assertEquals(j, message.getBodyBuffer().readInt());
}
- catch (AssertionFailedError e)
- {
- log.info("Expected buffer:" +
dumbBytesHex(body.toByteBuffer().array(), 40));
- log.info("Arriving buffer:" +
dumbBytesHex(message2.getBodyBuffer().toByteBuffer().array(), 40));
- throw e;
- }
}
consumer.close();
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2009-11-25
20:11:08 UTC (rev 8403)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2009-11-25
22:20:57 UTC (rev 8404)
@@ -350,7 +350,6 @@
HornetQBuffer buffer = message.getBodyBuffer();
buffer.resetReaderIndex();
- assertEquals(numberOfBytes, buffer.writerIndex());
for (long b = 0; b < numberOfBytes; b++)
{
if (b % (1024l * 1024l) == 0)
@@ -360,6 +359,15 @@
assertEquals(getSamplebyte(b), buffer.readByte());
}
+
+ try
+ {
+ buffer.readByte();
+ fail("Supposed to throw an exception");
+ }
+ catch (Exception e)
+ {
+ }
}
}
catch (Throwable e)
@@ -396,8 +404,6 @@
assertNotNull(message);
- log.debug("Message: " + i);
-
System.currentTimeMillis();
if (delayDelivery > 0)
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-25
20:11:08 UTC (rev 8403)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-25
22:20:57 UTC (rev 8404)
@@ -288,7 +288,7 @@
assertEquals(0, manager.getActiveTokens().size());
- ServerMessage msg = new ServerMessageImpl(1, 10);
+ ServerMessage msg = new ServerMessageImpl(1, 1024);
SimpleString dummy = new SimpleString("dummy");
msg.setDestination(dummy);