Author: clebert.suconic(a)jboss.com
Date: 2011-01-10 15:30:22 -0500 (Mon, 10 Jan 2011)
New Revision: 10112
Modified:
trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/utils/DeflaterReader.java
Log:
JBOAOO-5595 - Fixing large message send on file-size > 32bits integers
Modified:
trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
===================================================================
---
trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java 2011-01-08
04:28:52 UTC (rev 10111)
+++
trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java 2011-01-10
20:30:22 UTC (rev 10112)
@@ -49,10 +49,8 @@
// This may take some considerable time to create, send and consume - if it takes too
long or you don't have
// enough disk space just reduce the file size here
- private final long FILE_SIZE = 256L * 1024 * 1024;
+ private final long FILE_SIZE = 2L * 1024 * 1024 * 1024; // 10 GiB message
- //private final long FILE_SIZE = 10L * 1024 * 1024 * 1024; // 10 GiB message
-
@Override
public boolean runExample() throws Exception
{
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-01-08
04:28:52 UTC (rev 10111)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-01-10
20:30:22 UTC (rev 10112)
@@ -468,14 +468,13 @@
// We won't know the real size of the message since we are compressing while
reading the streaming.
// This counter will be passed to the deflater to be updated for every byte read
- AtomicLong messageSize = new AtomicLong();
if (session.isCompressLargeMessages())
{
- input = new DeflaterReader(inputStreamParameter, messageSize);
+ input = new DeflaterReader(inputStreamParameter);
}
- int totalSize = 0;
+ long totalSize = 0;
while (!lastPacket)
{
@@ -518,18 +517,13 @@
if (lastPacket)
{
- if (!session.isCompressLargeMessages())
- {
- messageSize.set(totalSize);
- }
-
byte[] buff2 = new byte[pos];
System.arraycopy(buff, 0, buff2, 0, pos);
buff = buff2;
- chunk = new SessionSendContinuationMessage(buff, false, sendBlocking,
messageSize.get());
+ chunk = new SessionSendContinuationMessage(buff, false, sendBlocking,
totalSize);
}
else
{
Modified: trunk/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
--- trunk/src/main/org/hornetq/utils/DeflaterReader.java 2011-01-08 04:28:52 UTC (rev
10111)
+++ trunk/src/main/org/hornetq/utils/DeflaterReader.java 2011-01-10 20:30:22 UTC (rev
10112)
@@ -41,6 +41,11 @@
input = inData;
this.bytesRead = bytesRead;
}
+
+ public DeflaterReader(final InputStream inData)
+ {
+ this(inData, null);
+ }
@Override
public int read() throws IOException
@@ -103,7 +108,10 @@
}
else
{
- bytesRead.addAndGet(m);
+ if (bytesRead != null)
+ {
+ bytesRead.addAndGet(m);
+ }
deflater.setInput(readBuffer, 0, m);
}
}
Show replies by date