[hornetq-commits] JBoss hornetq SVN: r10112 - in trunk: src/main/org/hornetq/core/client/impl and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Jan 10 15:30:22 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-01-10 15:30:22 -0500 (Mon, 10 Jan 2011)
New Revision: 10112
Modified:
trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/utils/DeflaterReader.java
Log:
JBOAOO-5595 - Fixing large message send on file-size > 32bits integers
Modified: trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
===================================================================
--- trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java 2011-01-08 04:28:52 UTC (rev 10111)
+++ trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java 2011-01-10 20:30:22 UTC (rev 10112)
@@ -49,10 +49,8 @@
// This may take some considerable time to create, send and consume - if it takes too long or you don't have
// enough disk space just reduce the file size here
- private final long FILE_SIZE = 256L * 1024 * 1024;
+ private final long FILE_SIZE = 2L * 1024 * 1024 * 1024; // 10 GiB message
- //private final long FILE_SIZE = 10L * 1024 * 1024 * 1024; // 10 GiB message
-
@Override
public boolean runExample() throws Exception
{
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-01-08 04:28:52 UTC (rev 10111)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-01-10 20:30:22 UTC (rev 10112)
@@ -468,14 +468,13 @@
// We won't know the real size of the message since we are compressing while reading the streaming.
// This counter will be passed to the deflater to be updated for every byte read
- AtomicLong messageSize = new AtomicLong();
if (session.isCompressLargeMessages())
{
- input = new DeflaterReader(inputStreamParameter, messageSize);
+ input = new DeflaterReader(inputStreamParameter);
}
- int totalSize = 0;
+ long totalSize = 0;
while (!lastPacket)
{
@@ -518,18 +517,13 @@
if (lastPacket)
{
- if (!session.isCompressLargeMessages())
- {
- messageSize.set(totalSize);
- }
-
byte[] buff2 = new byte[pos];
System.arraycopy(buff, 0, buff2, 0, pos);
buff = buff2;
- chunk = new SessionSendContinuationMessage(buff, false, sendBlocking, messageSize.get());
+ chunk = new SessionSendContinuationMessage(buff, false, sendBlocking, totalSize);
}
else
{
Modified: trunk/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
--- trunk/src/main/org/hornetq/utils/DeflaterReader.java 2011-01-08 04:28:52 UTC (rev 10111)
+++ trunk/src/main/org/hornetq/utils/DeflaterReader.java 2011-01-10 20:30:22 UTC (rev 10112)
@@ -41,6 +41,11 @@
input = inData;
this.bytesRead = bytesRead;
}
+
+ public DeflaterReader(final InputStream inData)
+ {
+ this(inData, null);
+ }
@Override
public int read() throws IOException
@@ -103,7 +108,10 @@
}
else
{
- bytesRead.addAndGet(m);
+ if (bytesRead != null)
+ {
+ bytesRead.addAndGet(m);
+ }
deflater.setInput(readBuffer, 0, m);
}
}
More information about the hornetq-commits
mailing list