Author: clebert.suconic(a)jboss.com
Date: 2010-12-17 17:30:28 -0500 (Fri, 17 Dec 2010)
New Revision: 10057
Modified:
trunk/src/main/org/hornetq/api/core/Message.java
trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java
trunk/src/main/org/hornetq/utils/DeflaterReader.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java
Log:
JBPAPP-5595 - Large Message recreating buffer issue over JMS and getting message size
without recreate buffer
Modified: trunk/src/main/org/hornetq/api/core/Message.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/Message.java 2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/api/core/Message.java 2010-12-17 22:30:28 UTC (rev 10057)
@@ -67,6 +67,8 @@
public static final SimpleString HDR_GROUP_ID = new
SimpleString("_HQ_GROUP_ID");
public static final SimpleString HDR_LARGE_COMPRESSED = new
SimpleString("_HQ_LARGE_COMPRESSED");
+
+ public static final SimpleString HDR_LARGE_BODY_SIZE = new
SimpleString("_HQ_LARGE_SIZE");
public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new
SimpleString("_HQ_SCHED_DELIVERY");
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java 2010-12-17
22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java 2010-12-17
22:30:28 UTC (rev 10057)
@@ -18,6 +18,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
import org.hornetq.core.buffers.impl.ResetLimitWrappedHornetQBuffer;
import org.hornetq.utils.DataConstants;
@@ -38,7 +39,7 @@
// Used only when receiving large messages
private LargeMessageController largeMessageController;
-
+
private long largeMessageSize;
// Static --------------------------------------------------------
@@ -93,7 +94,7 @@
{
largeMessageController = controller;
}
-
+
public HornetQBuffer getBodyBuffer()
{
checkBuffer();
@@ -101,15 +102,11 @@
return bodyBuffer;
}
-
public int getBodySize()
{
- checkBuffer();
- return buffer.writerIndex() - buffer.readerIndex();
+ return getLongProperty(Message.HDR_LARGE_BODY_SIZE).intValue();
}
-
-
public LargeMessageController getLargeMessageController()
{
return largeMessageController;
@@ -160,7 +157,7 @@
return largeMessageController.waitCompletion(timeMilliseconds);
}
}
-
+
public void discardBody()
{
if (bodyBuffer != null)
@@ -173,27 +170,26 @@
}
}
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
-
+
private void checkBuffer()
{
if (bodyBuffer == null)
{
-
+
long bodySize = this.largeMessageSize + BODY_OFFSET;
if (bodySize > Integer.MAX_VALUE)
{
bodySize = Integer.MAX_VALUE;
}
createBody((int)bodySize);
-
+
bodyBuffer = new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, buffer, this);
-
+
try
{
largeMessageController.saveBuffer(new HornetQOutputStream(bodyBuffer));
@@ -204,14 +200,13 @@
}
}
}
-
// Inner classes -------------------------------------------------
-
+
protected class HornetQOutputStream extends OutputStream
{
HornetQBuffer bufferOut;
-
+
HornetQOutputStream(HornetQBuffer out)
{
this.bufferOut = out;
@@ -225,7 +220,7 @@
{
bufferOut.writeByte((byte)(b & 0xff));
}
-
+
}
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-12-17
22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-12-17
22:30:28 UTC (rev 10057)
@@ -15,6 +15,7 @@
import java.io.IOException;
import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
@@ -361,13 +362,11 @@
InputStream input = msgI.getBodyInputStream();
-
if (msgI.isServerMessage())
{
largeMessageSendServer(sendBlocking, msgI, credits);
}
- else
- if (input != null)
+ else if (input != null)
{
largeMessageSendStreamed(sendBlocking, msgI, input, credits);
}
@@ -376,7 +375,7 @@
largeMessageSendBuffered(sendBlocking, msgI, credits);
}
}
-
+
/**
* Used to send serverMessages through the bridges.
* No need to validate compression here since the message is only compressed at the
client
@@ -385,8 +384,8 @@
* @throws HornetQException
*/
private void largeMessageSendServer(final boolean sendBlocking,
- final MessageInternal msgI,
- final ClientProducerCredits credits) throws
HornetQException
+ final MessageInternal msgI,
+ final ClientProducerCredits credits) throws
HornetQException
{
BodyEncoder context = msgI.getBodyEncoder();
@@ -440,8 +439,6 @@
}
}
-
-
/**
* @param sendBlocking
* @param msgI
@@ -469,11 +466,17 @@
InputStream input = inputStreamParameter;
+ // We won't know the real size of the message since we are compressing while
reading the streaming.
+ // This counter will be passed to the deflater to be updated for every byte read
+ AtomicLong messageSize = new AtomicLong();
+
if (session.isCompressLargeMessages())
{
- input = new DeflaterReader(inputStreamParameter);
+ input = new DeflaterReader(inputStreamParameter, messageSize);
}
+ int totalSize = 0;
+
while (!lastPacket)
{
byte[] buff = new byte[minLargeMessageSize];
@@ -508,19 +511,31 @@
}
while (pos < minLargeMessageSize);
+ totalSize += pos;
+
+ final SessionSendContinuationMessage chunk;
+
if (lastPacket)
{
+
+ if (!session.isCompressLargeMessages())
+ {
+ messageSize.set(totalSize);
+ }
+
byte[] buff2 = new byte[pos];
System.arraycopy(buff, 0, buff2, 0, pos);
buff = buff2;
+
+ chunk = new SessionSendContinuationMessage(buff, false, sendBlocking,
messageSize.get());
}
+ else
+ {
+ chunk = new SessionSendContinuationMessage(buff, true, false);
+ }
- final SessionSendContinuationMessage chunk = new
SessionSendContinuationMessage(buff,
-
!lastPacket,
-
lastPacket && sendBlocking);
-
if (sendBlocking && lastPacket)
{
// When sending it blocking, only the last chunk will be blocking.
@@ -551,6 +566,5 @@
e);
}
}
-
// Inner Classes
--------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-12-17
22:03:22 UTC (rev 10056)
+++
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-12-17
22:30:28 UTC (rev 10057)
@@ -458,7 +458,7 @@
{
SessionSendContinuationMessage message =
(SessionSendContinuationMessage)packet;
requiresResponse = message.isRequiresResponse();
- session.sendContinuations(message.getPacketSize(), message.getBody(),
message.isContinues());
+ session.sendContinuations(message.getPacketSize(),
message.getMessageBodySize(), message.getBody(), message.isContinues());
if (requiresResponse)
{
response = new NullResponseMessage();
Modified:
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java 2010-12-17
22:03:22 UTC (rev 10056)
+++
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java 2010-12-17
22:30:28 UTC (rev 10057)
@@ -39,6 +39,7 @@
protected byte[] body;
protected boolean continues;
+
// Static --------------------------------------------------------
Modified:
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2010-12-17
22:03:22 UTC (rev 10056)
+++
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2010-12-17
22:30:28 UTC (rev 10057)
@@ -33,7 +33,12 @@
// Attributes ----------------------------------------------------
private boolean requiresResponse;
-
+
+ /**
+ * to be sent on the last package
+ */
+ private long messageBodySize = -1;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -54,6 +59,17 @@
this.requiresResponse = requiresResponse;
}
+ /**
+ * @param body
+ * @param continues
+ * @param requiresResponse
+ */
+ public SessionSendContinuationMessage(final byte[] body, final boolean continues,
final boolean requiresResponse, final long messageBodySize)
+ {
+ this(body, continues, requiresResponse);
+ this.messageBodySize = messageBodySize;
+ }
+
// Public --------------------------------------------------------
/**
@@ -63,11 +79,20 @@
{
return requiresResponse;
}
+
+ public long getMessageBodySize()
+ {
+ return messageBodySize;
+ }
@Override
public void encodeRest(final HornetQBuffer buffer)
{
super.encodeRest(buffer);
+ if (!continues)
+ {
+ buffer.writeLong(messageBodySize);
+ }
buffer.writeBoolean(requiresResponse);
}
@@ -75,6 +100,10 @@
public void decodeRest(final HornetQBuffer buffer)
{
super.decodeRest(buffer);
+ if (!continues)
+ {
+ messageBodySize = buffer.readLong();
+ }
requiresResponse = buffer.readBoolean();
}
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-12-17 22:03:22 UTC (rev
10056)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-12-17 22:30:28 UTC (rev
10057)
@@ -99,7 +99,7 @@
void receiveConsumerCredits(long consumerID, int credits) throws Exception;
- void sendContinuations(int packetSize, byte[] body, boolean continues) throws
Exception;
+ void sendContinuations(int packetSize, long totalBodySize, byte[] body, boolean
continues) throws Exception;
void send(ServerMessage message, boolean direct) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-12-17 22:03:22
UTC (rev 10056)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-12-17 22:30:28
UTC (rev 10057)
@@ -29,6 +29,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.core.client.impl.ClientMessageImpl;
@@ -1022,7 +1023,7 @@
}
}
- public void sendContinuations(final int packetSize, final byte[] body, final boolean
continues) throws Exception
+ public void sendContinuations(final int packetSize, final long messageBodySize, final
byte[] body, final boolean continues) throws Exception
{
if (currentLargeMessage == null)
{
@@ -1037,6 +1038,8 @@
if (!continues)
{
currentLargeMessage.releaseResources();
+
+ currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE,
messageBodySize);
doSend(currentLargeMessage, false);
Modified: trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java 2010-12-17 22:03:22 UTC
(rev 10056)
+++ trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java 2010-12-17 22:30:28 UTC
(rev 10057)
@@ -407,8 +407,6 @@
public void doBeforeReceive() throws Exception
{
bodyLength = message.getBodySize();
-
- super.doBeforeReceive();
}
// HornetQRAMessage overrides ----------------------------------------
Modified: trunk/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
--- trunk/src/main/org/hornetq/utils/DeflaterReader.java 2010-12-17 22:03:22 UTC (rev
10056)
+++ trunk/src/main/org/hornetq/utils/DeflaterReader.java 2010-12-17 22:30:28 UTC (rev
10057)
@@ -15,6 +15,7 @@
import java.io.IOException;
import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Deflater;
/**
@@ -27,24 +28,28 @@
*/
public class DeflaterReader extends InputStream
{
- private Deflater deflater = new Deflater();
+ private final Deflater deflater = new Deflater();
private boolean isFinished = false;
private boolean compressDone = false;
private InputStream input;
- public DeflaterReader(InputStream inData)
+ private final AtomicLong bytesRead;
+
+ public DeflaterReader(final InputStream inData, final AtomicLong bytesRead)
{
input = inData;
+ this.bytesRead = bytesRead;
}
+ @Override
public int read() throws IOException
{
byte[] buffer = new byte[1];
int n = read(buffer, 0, 1);
if (n == 1)
{
- return (int)buffer[0] & 0xFF;
+ return buffer[0] & 0xFF;
}
if (n == -1 || n == 0)
{
@@ -62,7 +67,7 @@
* @throws IOException
*/
@Override
- public int read(byte[] buffer, int offset, int len) throws IOException
+ public int read(final byte[] buffer, int offset, int len) throws IOException
{
if (compressDone)
{
@@ -98,6 +103,7 @@
}
else
{
+ bytesRead.addAndGet(m);
deflater.setInput(readBuffer, 0, m);
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-12-17
22:03:22 UTC (rev 10056)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-12-17
22:30:28 UTC (rev 10057)
@@ -940,7 +940,7 @@
server.start();
- locator.setMinLargeMessageSize(111);
+ locator.setMinLargeMessageSize(200);
locator.setCacheLargeMessagesClient(true);
Modified: trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java 2010-12-17
22:03:22 UTC (rev 10056)
+++ trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java 2010-12-17
22:30:28 UTC (rev 10057)
@@ -16,6 +16,7 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Deflater;
import org.hornetq.tests.util.UnitTestCase;
@@ -39,7 +40,8 @@
ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
- DeflaterReader reader = new DeflaterReader(inputStream);
+ AtomicLong counter = new AtomicLong(0);
+ DeflaterReader reader = new DeflaterReader(inputStream, counter);
ArrayList<Integer> zipHolder = new ArrayList<Integer>();
int b = reader.read();
@@ -50,6 +52,8 @@
b = reader.read();
}
+ assertEquals(input.length, counter.get());
+
byte[] allCompressed = new byte[zipHolder.size()];
for (int i = 0; i < allCompressed.length; i++)
{
@@ -71,8 +75,9 @@
byte[] input = inputString.getBytes("UTF-8");
ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
-
- DeflaterReader reader = new DeflaterReader(inputStream);
+ AtomicLong counter = new AtomicLong(0);
+
+ DeflaterReader reader = new DeflaterReader(inputStream, counter);
byte[] buffer = new byte[7];
ArrayList<Integer> zipHolder = new ArrayList<Integer>();
@@ -87,6 +92,8 @@
n = reader.read(buffer);
}
+ assertEquals(input.length, counter.get());
+
byte[] allCompressed = new byte[zipHolder.size()];
for (int i = 0; i < allCompressed.length; i++)
{